Skip to content

Commit

Permalink
Generalizing sql query processing
Browse files Browse the repository at this point in the history
  • Loading branch information
pdelewski committed May 9, 2024
1 parent def5925 commit 8cbb01b
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 23 deletions.
26 changes: 8 additions & 18 deletions quesma/clickhouse/quesma_communicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,11 @@ func (lm *LogManager) ProcessSelectQuery(ctx context.Context, table *Table, quer
if err != nil {
return nil, err
}
rows, err := executeQuery(ctx, lm, table.Name, query.StringFromColumns(colNames), append(colNames, query.NonSchemaFields...), rowToScan)
if err == nil {
for _, row := range rows {
row.Index = table.Name
}
resultColumns, err := table.extractColumns(query, true)
if err != nil {
return nil, err
}
return rows, err
return executeQuery(ctx, lm, table.Name, query.StringFromColumns(colNames), resultColumns, rowToScan)
}

// TODO add support for autocomplete for attributes, if we'll find it needed
Expand Down Expand Up @@ -114,22 +112,14 @@ func executeQuery(ctx context.Context, lm *LogManager, tableName string, queryAs
return res, err
}

func (lm *LogManager) ProcessAutocompleteSuggestionsQuery(ctx context.Context, table *Table, query *model.Query) ([]model.QueryResultRow, error) {
func (lm *LogManager) ProcessQuery(ctx context.Context, table *Table, query *model.Query) ([]model.QueryResultRow, error) {
colNames, err := table.extractColumns(query, false)
if err != nil {
return nil, err
}
rowToScan := make([]interface{}, len(colNames)+len(query.NonSchemaFields))
return executeQuery(ctx, lm, table.Name, query.String(), query.Fields, rowToScan)
}

func (lm *LogManager) ProcessGeneralAggregationQuery(ctx context.Context, table *Table, query *model.Query) ([]model.QueryResultRow, error) {
colNames, err := table.extractColumns(query, true)
if err != nil {
return nil, err
}
rowToScan := make([]interface{}, len(colNames))
return executeQuery(ctx, lm, table.Name, query.String(), colNames, rowToScan)
rowToScanLen := len(colNames) + len(query.NonSchemaFields)
rowToScan := make([]interface{}, rowToScanLen)
return executeQuery(ctx, lm, table.Name, query.StringFromColumns(colNames), colNames, rowToScan)
}

// 'selectFields' are all values that we return from the query, both columns and non-schema fields,
Expand Down
28 changes: 25 additions & 3 deletions quesma/model/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,14 @@ func (q *Query) String() string {
func (q *Query) StringFromColumns(colNames []string) string {
var sb strings.Builder
sb.WriteString("SELECT ")
if q.IsDistinct {
sb.WriteString("DISTINCT ")
}
for i, field := range colNames {
if field != EmptyFieldSelection {
sb.WriteString(strconv.Quote(field))
} else {
if field == "*" || field == EmptyFieldSelection {
sb.WriteString(field)
} else {
sb.WriteString(strconv.Quote(field))
}
if i < len(colNames)-1 || len(q.NonSchemaFields) > 0 {
sb.WriteString(", ")
Expand All @@ -112,6 +115,25 @@ func (q *Query) StringFromColumns(colNames []string) string {
where = ""
}
sb.WriteString(" FROM " + q.FromClause + where + q.WhereClause + " " + strings.Join(q.SuffixClauses, " "))
if len(q.GroupByFields) > 0 {
sb.WriteString(" GROUP BY (")
for i, field := range q.GroupByFields {
sb.WriteString(field)
if i < len(q.GroupByFields)-1 {
sb.WriteString(", ")
}
}
sb.WriteString(")")

sb.WriteString(" ORDER BY (")
for i, field := range q.GroupByFields {
sb.WriteString(field)
if i < len(q.GroupByFields)-1 {
sb.WriteString(", ")
}
}
sb.WriteString(")")
}
return sb.String()
}

Expand Down
2 changes: 1 addition & 1 deletion quesma/quesma/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ func (q *QueryRunner) searchAggregationWorkerCommon(ctx context.Context, aggrega
for _, agg := range aggregations {
logger.InfoWithCtx(ctx).Msg(agg.String()) // I'd keep for now until aggregations work fully
sqls += agg.Query.String() + "\n"
rows, err := q.logManager.ProcessGeneralAggregationQuery(dbQueryCtx, table, &agg.Query)
rows, err := q.logManager.ProcessQuery(dbQueryCtx, table, &agg.Query)
if err != nil {
logger.ErrorWithCtx(ctx).Msg(err.Error())
continue
Expand Down
2 changes: 1 addition & 1 deletion quesma/quesma/termsenum/terms_enum.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func handleTermsEnumRequest(ctx context.Context, reqBody []byte, qt *queryparser
dbQueryCtx, cancel := context.WithCancel(ctx)
// TODO this will be used to cancel goroutine that is executing the query
_ = cancel
if rows, err2 := qt.ClickhouseLM.ProcessAutocompleteSuggestionsQuery(dbQueryCtx, qt.Table, selectQuery); err2 != nil {
if rows, err2 := qt.ClickhouseLM.ProcessQuery(dbQueryCtx, qt.Table, selectQuery); err2 != nil {
logger.Error().Msgf("terms enum failed - error processing SQL query [%s]", err2)
result, err = json.Marshal(emptyTermsEnumResponse())
} else {
Expand Down

0 comments on commit 8cbb01b

Please sign in to comment.