From bfc34627a482314b28d2220c97929dc9421bd531 Mon Sep 17 00:00:00 2001 From: shen yushi Date: Fri, 24 Jan 2025 16:51:04 +0800 Subject: [PATCH] Groupby3 (#2493) ### What problem does this PR solve? Add having expression ### Type of change - [x] New Feature (non-breaking change which adds functionality) - [x] Test cases --- .../knn/knn_query_benchmark.cpp | 2 +- docs/references/http_api_reference.mdx | 4 ++ docs/references/pysdk_api_reference.md | 29 ++++++++ .../local_infinity/client.py | 4 ++ .../local_infinity/query_builder.py | 16 ++++- .../infinity_embedded/local_infinity/table.py | 6 ++ python/infinity_http.py | 9 +++ .../infinity/remote_thrift/client.py | 3 +- .../infinity/remote_thrift/query_builder.py | 14 +++- .../infinity/remote_thrift/table.py | 5 ++ python/test_pysdk/test_groupby.py | 48 ++++++++++++++ src/embedded_infinity/wrap_infinity.cpp | 38 ++++++++++- src/embedded_infinity/wrap_infinity.cppm | 2 + src/embedded_infinity_ext.cpp | 2 + src/main/infinity.cpp | 6 +- src/main/infinity.cppm | 4 +- src/network/http/http_search.cpp | 40 ++++++++++- src/network/infinity_thrift_service.cpp | 66 ++++++++++++++++++- src/unit_test/main/infinity.cpp | 2 +- src/unit_test/main/table.cpp | 8 ++- .../dql/aggregate/test_groupby_complex.slt | 17 +++++ 21 files changed, 312 insertions(+), 13 deletions(-) diff --git a/benchmark/local_infinity/knn/knn_query_benchmark.cpp b/benchmark/local_infinity/knn/knn_query_benchmark.cpp index e922b503eb..032b4ead67 100644 --- a/benchmark/local_infinity/knn/knn_query_benchmark.cpp +++ b/benchmark/local_infinity/knn/knn_query_benchmark.cpp @@ -221,7 +221,7 @@ int main(int argc, char *argv[]) { auto select_rowid_expr = new FunctionExpr(); select_rowid_expr->func_name_ = "row_id"; output_columns->emplace_back(select_rowid_expr); - auto result = infinity->Search(db_name, table_name, search_expr, nullptr, nullptr, nullptr, output_columns, nullptr, nullptr, nullptr, false); + auto result = infinity->Search(db_name, table_name, search_expr, nullptr, nullptr, nullptr, output_columns, nullptr, nullptr, nullptr, nullptr, false); { auto &cv = result.result_table_->GetDataBlockById(0)->column_vectors; auto &column = *cv[0]; diff --git a/docs/references/http_api_reference.mdx b/docs/references/http_api_reference.mdx index 2108f31fcc..2417b1b648 100644 --- a/docs/references/http_api_reference.mdx +++ b/docs/references/http_api_reference.mdx @@ -1777,6 +1777,7 @@ Searches for data in a specified table. The search can range from a simple vecto - `"fusion"`: `object` - `"sort"` : `object[]` - `"group_by"`: `string[]` + - `"having"`: `string` - `"limit"` : `string` - `"offset"` : `string` - `"option"` : `object` @@ -2017,6 +2018,9 @@ curl --request GET \ - `"group_by"`: `string[]` Indicates the expression to group by. +- `"having"`: `string` + Indicates the having condition. + - `"limit"` : `string` Indicates the limit row count. diff --git a/docs/references/pysdk_api_reference.md b/docs/references/pysdk_api_reference.md index 0970cf3f83..9d462c5476 100644 --- a/docs/references/pysdk_api_reference.md +++ b/docs/references/pysdk_api_reference.md @@ -1906,6 +1906,35 @@ table_obj.output(["c1", "avg(c1)", "count(c2)", "min(c3)", "max(c4)"]).group_by( --- +### having + +```python +table_object.having(expr) +``` + +Creates a filtering condition expression for the group-by result. + +#### Parameters + +##### having_expr: `str`, *Required* + +A string specifying the having expression. + +#### Returns + +- Success: An `infinity.local_infinity.table.LocalTable` object in embedded mode or an `infinity.remote_thrift.table.RemoteTable` object in client-server mode. +- Failure: `InfinityException` + - `error_code`: `int` A non-zero value indicating a specific error condition. + - `error_msg`: `str` A message providing additional details about the error. + +#### Examples + +```python +table_obj.output(["c1", "sum(c2)"]).group_by(["c1"]).having("sum(c2) > 10").to_df() +``` + +--- + ### limit ```python diff --git a/python/infinity_embedded/local_infinity/client.py b/python/infinity_embedded/local_infinity/client.py index 82f322ab27..f699f958e7 100644 --- a/python/infinity_embedded/local_infinity/client.py +++ b/python/infinity_embedded/local_infinity/client.py @@ -207,6 +207,7 @@ def search(self, highlight_list: list[WrapParsedExpr] = [], order_by_list: list[WrapOrderByExpr] = [], group_by_list: list[WrapParsedExpr] = [], + having_expr: WrapParsedExpr = None, total_hits_count_flag: bool = False, search_expr: WrapSearchExpr = None, where_expr: WrapParsedExpr = None, @@ -220,6 +221,7 @@ def search(self, highlight_list, order_by_list, group_by_list, + having_expr, total_hits_count_flag, search_expr, where_expr, @@ -236,6 +238,7 @@ def explain(self, order_by_list: list[WrapOrderByExpr] = [], group_by_list: list[WrapParsedExpr] = [], search_expr: WrapSearchExpr = None, + having_expr: WrapParsedExpr = None, where_expr: WrapParsedExpr = None, limit_expr: WrapParsedExpr = None, offset_expr: WrapParsedExpr = None): @@ -249,6 +252,7 @@ def explain(self, order_by_list, group_by_list, search_expr, + having_expr, where_expr, limit_expr, offset_expr), diff --git a/python/infinity_embedded/local_infinity/query_builder.py b/python/infinity_embedded/local_infinity/query_builder.py index 30b4cd3723..884816c354 100644 --- a/python/infinity_embedded/local_infinity/query_builder.py +++ b/python/infinity_embedded/local_infinity/query_builder.py @@ -41,6 +41,7 @@ def __init__( search: Optional[WrapSearchExpr], filter: Optional[WrapParsedExpr], group_by: Optional[List[WrapParsedExpr]], + having: Optional[WrapParsedExpr], limit: Optional[WrapParsedExpr], offset: Optional[WrapParsedExpr], sort: Optional[List[WrapOrderByExpr]], @@ -51,6 +52,7 @@ def __init__( self.search = search self.filter = filter self.group_by = group_by + self.having = having self.limit = limit self.offset = offset self.sort = sort @@ -65,12 +67,13 @@ def __init__( search: Optional[WrapSearchExpr], filter: Optional[WrapParsedExpr], group_by: Optional[List[WrapParsedExpr]], + having: Optional[WrapParsedExpr], limit: Optional[WrapParsedExpr], offset: Optional[WrapParsedExpr], sort: Optional[List[WrapOrderByExpr]], explain_type: Optional[BaseExplainType], ): - super().__init__(columns, highlight, search, filter, group_by, limit, offset, sort, None) + super().__init__(columns, highlight, search, filter, group_by, having, limit, offset, sort, None) self.explain_type = explain_type @@ -82,6 +85,7 @@ def __init__(self, table): self._search = None self._filter = None self._group_by = None + self._having = None self._limit = None self._offset = None self._sort = None @@ -93,6 +97,7 @@ def reset(self): self._search = None self._filter = None self._group_by = None + self._having = None self._limit = None self._offset = None self._sort = None @@ -421,7 +426,7 @@ def offset(self, offset: Optional[int]) -> InfinityLocalQueryBuilder: self._offset = offset_expr return self - def group_by(self, columns: List[str] | str): + def group_by(self, columns: List[str] | str) -> InfinityLocalQueryBuilder: group_by_list = [] if isinstance(columns, list): for column in columns: @@ -433,6 +438,11 @@ def group_by(self, columns: List[str] | str): self._group_by = group_by_list return self + def having(self, having: Optional[str]) -> InfinityLocalQueryBuilder: + having_expr = traverse_conditions(condition(having)) + self._having = having_expr + return self + def output(self, columns: Optional[list]) -> InfinityLocalQueryBuilder: self._columns = columns select_list: List[WrapParsedExpr] = [] @@ -700,6 +710,7 @@ def to_result(self) -> tuple[dict[str, list[Any]], dict[str, Any], {}]: search=self._search, filter=self._filter, group_by=self._group_by, + having = self._having, limit=self._limit, offset=self._offset, sort=self._sort, @@ -731,6 +742,7 @@ def explain(self, explain_type=ExplainType.kPhysical) -> Any: search=self._search, filter=self._filter, group_by=self._group_by, + having=self._having, limit=self._limit, offset=self._offset, explain_type=explain_type, diff --git a/python/infinity_embedded/local_infinity/table.py b/python/infinity_embedded/local_infinity/table.py index 025430e09f..39cbee1f98 100644 --- a/python/infinity_embedded/local_infinity/table.py +++ b/python/infinity_embedded/local_infinity/table.py @@ -385,6 +385,10 @@ def group_by(self, group_by_expr_list: Optional[List[str]] | Optional[str]): self.query_builder.group_by(group_by_expr_list) return self + def having(self, having: Optional[str]): + self.query_builder.having(having) + return self + def sort(self, order_by_expr_list: Optional[List[list[str, SortType]]]): for order_by_expr in order_by_expr_list: if len(order_by_expr) != 2: @@ -466,6 +470,7 @@ def _execute_query(self, query: Query): highlight_list=highlight, order_by_list=order_by_list, group_by_list=group_by_list, + having_expr=query.having, total_hits_count_flag=total_hits_count_flag, search_expr=query.search, where_expr=query.filter, @@ -498,6 +503,7 @@ def _explain_query(self, query: ExplainQuery) -> Any: highlight_list=highlight, order_by_list=order_by_list, group_by_list=group_by_list, + having_expr=query.having, search_expr=query.search, where_expr=query.filter, limit_expr=query.limit, diff --git a/python/infinity_http.py b/python/infinity_http.py index 6dbb88538d..141573e688 100644 --- a/python/infinity_http.py +++ b/python/infinity_http.py @@ -713,6 +713,7 @@ def __init__(self, output: list, table_http: table_http): self._search_exprs = [] self._sort = [] self._group_by = [] + self._having = [] self._limit = None self._offset = None self._option = None @@ -733,6 +734,8 @@ def select(self): tmp["sort"] = self._sort if len(self._group_by): tmp["group_by"] = self._group_by + if len(self._having): + tmp["having"] = self._having if self._limit is not None: tmp["limit"] = str(self._limit) if self._offset is not None: @@ -781,6 +784,8 @@ def explain(self, ExplainType=ExplainType.Physical): tmp["sort"] = self._sort if len(self._group_by): tmp["group_by"] = self._group_by + if len(self._having): + tmp["having"] = self._having if self._limit is not None: tmp["limit"] = self._limit if self._offset is not None: @@ -838,6 +843,10 @@ def offset(self, offset): def group_by(self, group_by_list): self._group_by = group_by_list return self + + def having(self, having_expr): + self._having = having_expr + return self def option(self, option: {}): # option_str = json.dumps(option) diff --git a/python/infinity_sdk/infinity/remote_thrift/client.py b/python/infinity_sdk/infinity/remote_thrift/client.py index 4e64e1d154..c949143d38 100644 --- a/python/infinity_sdk/infinity/remote_thrift/client.py +++ b/python/infinity_sdk/infinity/remote_thrift/client.py @@ -264,7 +264,7 @@ def export_data(self, db_name: str, table_name: str, file_name: str, export_opti @retry_wrapper def select(self, db_name: str, table_name: str, select_list, highlight_list, search_expr, - where_expr, group_by_list, limit_expr, offset_expr, order_by_list, total_hits_count): + where_expr, group_by_list, having_expr, limit_expr, offset_expr, order_by_list, total_hits_count): return self.client.Select(SelectRequest(session_id=self.session_id, db_name=db_name, table_name=table_name, @@ -273,6 +273,7 @@ def select(self, db_name: str, table_name: str, select_list, highlight_list, sea search_expr=search_expr, where_expr=where_expr, group_by_list=group_by_list, + having_expr=having_expr, limit_expr=limit_expr, offset_expr=offset_expr, order_by_list=order_by_list, diff --git a/python/infinity_sdk/infinity/remote_thrift/query_builder.py b/python/infinity_sdk/infinity/remote_thrift/query_builder.py index 300b01595a..4b969914be 100644 --- a/python/infinity_sdk/infinity/remote_thrift/query_builder.py +++ b/python/infinity_sdk/infinity/remote_thrift/query_builder.py @@ -45,6 +45,7 @@ def __init__( search: Optional[SearchExpr], filter: Optional[ParsedExpr], groupby: Optional[List[ParsedExpr]], + having: Optional[ParsedExpr], limit: Optional[ParsedExpr], offset: Optional[ParsedExpr], sort: Optional[List[OrderByExpr]], @@ -55,6 +56,7 @@ def __init__( self.search = search self.filter = filter self.groupby = groupby + self.having = having self.limit = limit self.offset = offset self.sort = sort @@ -69,12 +71,13 @@ def __init__( search: Optional[SearchExpr], filter: Optional[ParsedExpr], groupby: Optional[List[ParsedExpr]], + having: Optional[ParsedExpr], limit: Optional[ParsedExpr], offset: Optional[ParsedExpr], sort: Optional[List[OrderByExpr]], explain_type: Optional[ExplainType], ): - super().__init__(columns, highlight, search, filter, groupby, limit, offset, sort, False) + super().__init__(columns, highlight, search, filter, groupby, having, limit, offset, sort, False) self.explain_type = explain_type @@ -86,6 +89,7 @@ def __init__(self, table): self._search = None self._filter = None self._groupby = None + self._having = None self._limit = None self._offset = None self._sort = None @@ -97,6 +101,7 @@ def reset(self): self._search = None self._filter = None self._groupby = None + self._having = None self._limit = None self._offset = None self._sort = None @@ -349,6 +354,11 @@ def group_by(self, columns: List[str] | str) -> InfinityThriftQueryBuilder: group_by_list.append(parse_expr(maybe_parse(columns))) self._groupby = group_by_list return self + + def having(self, having: Optional[str]) -> InfinityThriftQueryBuilder: + having_expr = traverse_conditions(condition(having)) + self._having = having_expr + return self def output(self, columns: Optional[list]) -> InfinityThriftQueryBuilder: self._columns = columns @@ -538,6 +548,7 @@ def to_result(self) -> tuple[dict[str, list[Any]], dict[str, Any], {}]: search=self._search, filter=self._filter, groupby=self._groupby, + having=self._having, limit=self._limit, offset=self._offset, sort=self._sort, @@ -569,6 +580,7 @@ def explain(self, explain_type=ExplainType.Physical) -> Any: search=self._search, filter=self._filter, groupby=self._groupby, + having=self._having, limit=self._limit, offset=self._offset, sort=self._sort, diff --git a/python/infinity_sdk/infinity/remote_thrift/table.py b/python/infinity_sdk/infinity/remote_thrift/table.py index ac8672b16b..9f086340a3 100644 --- a/python/infinity_sdk/infinity/remote_thrift/table.py +++ b/python/infinity_sdk/infinity/remote_thrift/table.py @@ -404,6 +404,10 @@ def group_by(self, group_by_expr_list: Optional[List[str]] | Optional[str]): self.query_builder.group_by(group_by_expr_list) return self + def having(self, having_expr: Optional[str]): + self.query_builder.having(having_expr) + return self + def sort(self, order_by_expr_list: Optional[List[list[str, SortType]]]): for order_by_expr in order_by_expr_list: if len(order_by_expr) != 2: @@ -507,6 +511,7 @@ def _execute_query(self, query: Query) -> tuple[dict[str, list[Any]], dict[str, search_expr=query.search, where_expr=query.filter, group_by_list=query.groupby, + having_expr=query.having, limit_expr=query.limit, offset_expr=query.offset, order_by_list=query.sort, diff --git a/python/test_pysdk/test_groupby.py b/python/test_pysdk/test_groupby.py index 0a55db5258..89a0ef9394 100644 --- a/python/test_pysdk/test_groupby.py +++ b/python/test_pysdk/test_groupby.py @@ -540,6 +540,54 @@ def test_groupby_complex(self, suffix): gt.sort_values(by=gt.columns.tolist()).reset_index(drop=True), ) + res, extra_result = ( + table_obj.output(["c3", "sum(c1)", "sum(c2)"]) + .group_by("c3") + .having("sum(c1) >= 2") + .to_df() + ) + gt = pd.DataFrame( + { + "c3": [1.0, 2.0, 3.0, 4.0, 6.0, 8.0], + "sum(c1)": [2, 4, 2, 4, 2, 2], + "sum(c2)": [4, 3, 5, 4, 3, 2], + } + ).astype( + { + "c3": dtype("float32"), + "sum(c1)": dtype("int64"), + "sum(c2)": dtype("int64"), + } + ) + pd.testing.assert_frame_equal( + res.sort_values(by=res.columns.tolist()).reset_index(drop=True), + gt.sort_values(by=gt.columns.tolist()).reset_index(drop=True), + ) + + res, extra_result = ( + table_obj.output(["c3", "sum(c1)", "sum(c2)"]) + .group_by("c3") + .having("sum(c1) >= 2 and c3 > 3") + .to_df() + ) + gt = pd.DataFrame( + { + "c3": [4.0, 6.0, 8.0], + "sum(c1)": [4, 2, 2], + "sum(c2)": [4, 3, 2], + } + ).astype( + { + "c3": dtype("float32"), + "sum(c1)": dtype("int64"), + "sum(c2)": dtype("int64"), + } + ) + pd.testing.assert_frame_equal( + res.sort_values(by=res.columns.tolist()).reset_index(drop=True), + gt.sort_values(by=gt.columns.tolist()).reset_index(drop=True), + ) + res, extra_result = ( table_obj.output(["c3", "count(c3)", "sum(c1)", "sum(c2)"]) .group_by("c3") diff --git a/src/embedded_infinity/wrap_infinity.cpp b/src/embedded_infinity/wrap_infinity.cpp index d05a40a2e7..409615988c 100644 --- a/src/embedded_infinity/wrap_infinity.cpp +++ b/src/embedded_infinity/wrap_infinity.cpp @@ -1448,6 +1448,7 @@ WrapQueryResult WrapSearch(Infinity &instance, Vector highlight_list, Vector order_by_list, Vector group_by_list, + WrapParsedExpr *having_expr, bool total_hits_count_flag, WrapSearchExpr *wrap_search_expr, WrapParsedExpr *filter_expr, @@ -1586,6 +1587,21 @@ WrapQueryResult WrapSearch(Infinity &instance, } } + ParsedExpr *having = nullptr; + DeferFn defer_fn9([&]() { + if (having != nullptr) { + delete having; + having = nullptr; + } + }); + if (having_expr != nullptr) { + Status status; + having = having_expr->GetParsedExpr(status); + if (status.code_ != ErrorCode::kOk) { + return WrapQueryResult(status.code_, status.msg_->c_str()); + } + } + Vector *order_by_exprs = nullptr; DeferFn defer_fn6([&]() { if (order_by_exprs != nullptr) { @@ -1622,6 +1638,7 @@ WrapQueryResult WrapSearch(Infinity &instance, highlight, order_by_exprs, group_by_exprs, + having, total_hits_count_flag); search_expr = nullptr; filter = nullptr; @@ -1631,6 +1648,7 @@ WrapQueryResult WrapSearch(Infinity &instance, highlight = nullptr; order_by_exprs = nullptr; group_by_exprs = nullptr; + having = nullptr; if (!query_result.IsOk()) { return WrapQueryResult(query_result.ErrorCode(), query_result.ErrorMsg()); } @@ -1649,6 +1667,7 @@ WrapQueryResult WrapExplain(Infinity &instance, Vector highlight_list, Vector order_by_list, Vector group_by_list, + WrapParsedExpr *having_expr, WrapSearchExpr *wrap_search_expr, WrapParsedExpr *filter_expr, WrapParsedExpr *limit_expr, @@ -1786,6 +1805,21 @@ WrapQueryResult WrapExplain(Infinity &instance, } } + ParsedExpr *having = nullptr; + DeferFn defer_fn13([&]() { + if (having != nullptr) { + delete having; + having = nullptr; + } + }); + if (having_expr != nullptr) { + Status status; + having = having_expr->GetParsedExpr(status); + if (status.code_ != ErrorCode::kOk) { + return WrapQueryResult(status.code_, status.msg_->c_str()); + } + } + Vector *order_by_exprs = nullptr; DeferFn defer_fn6([&]() { if (order_by_exprs != nullptr) { @@ -1822,7 +1856,8 @@ WrapQueryResult WrapExplain(Infinity &instance, output_columns, highlight, order_by_exprs, - group_by_exprs); + group_by_exprs, + having); search_expr = nullptr; filter = nullptr; limit = nullptr; @@ -1831,6 +1866,7 @@ WrapQueryResult WrapExplain(Infinity &instance, highlight = nullptr; order_by_exprs = nullptr; group_by_exprs = nullptr; + having = nullptr; if (!query_result.IsOk()) { return WrapQueryResult(query_result.ErrorCode(), query_result.ErrorMsg()); } diff --git a/src/embedded_infinity/wrap_infinity.cppm b/src/embedded_infinity/wrap_infinity.cppm index 7093a2d19e..e206066a49 100644 --- a/src/embedded_infinity/wrap_infinity.cppm +++ b/src/embedded_infinity/wrap_infinity.cppm @@ -424,6 +424,7 @@ export WrapQueryResult WrapExplain(Infinity &instance, Vector highlight_list, Vector order_by_list, Vector group_by_list, + WrapParsedExpr *having_expr, WrapSearchExpr *wrap_search_expr, WrapParsedExpr *filter_expr, WrapParsedExpr *limit_expr, @@ -436,6 +437,7 @@ export WrapQueryResult WrapSearch(Infinity &instance, Vector highlight_list, Vector order_by_list, Vector group_by_list, + WrapParsedExpr *having_expr, bool total_hits_count_flag, WrapSearchExpr *wrap_search_expr = nullptr, WrapParsedExpr *where_expr = nullptr, diff --git a/src/embedded_infinity_ext.cpp b/src/embedded_infinity_ext.cpp index 28eb74031a..296b768417 100644 --- a/src/embedded_infinity_ext.cpp +++ b/src/embedded_infinity_ext.cpp @@ -317,6 +317,7 @@ NB_MODULE(embedded_infinity_ext, m) { nb::arg("highlight_list"), nb::arg("order_by_list"), nb::arg("group_by_list"), + nb::arg("having_expr") = nullptr, nb::arg("wrap_search_expr") = nullptr, nb::arg("where_expr") = nullptr, nb::arg("limit_expr") = nullptr, @@ -329,6 +330,7 @@ NB_MODULE(embedded_infinity_ext, m) { nb::arg("highlight_list"), nb::arg("order_by_list"), nb::arg("group_by_list"), + nb::arg("having_expr") = nullptr, nb::arg("total_hits_count_flag"), nb::arg("wrap_search_expr") = nullptr, nb::arg("where_expr") = nullptr, diff --git a/src/main/infinity.cpp b/src/main/infinity.cpp index cad2e3a424..1f2f229fcf 100644 --- a/src/main/infinity.cpp +++ b/src/main/infinity.cpp @@ -1045,7 +1045,8 @@ QueryResult Infinity::Explain(const String &db_name, Vector *output_columns, Vector *highlight_columns, Vector *order_by_list, - Vector *group_by_list) { + Vector *group_by_list, + ParsedExpr *having) { DeferFn free_output_columns([&]() { if (output_columns != nullptr) { for (auto &output_column : *output_columns) { @@ -1113,6 +1114,7 @@ QueryResult Infinity::Explain(const String &db_name, select_statement->offset_expr_ = offset; select_statement->order_by_list_ = order_by_list; select_statement->group_by_list_ = group_by_list; + select_statement->having_expr_ = having; explain_statement->statement_ = select_statement; @@ -1134,6 +1136,7 @@ QueryResult Infinity::Search(const String &db_name, Vector *highlight_columns, Vector *order_by_list, Vector *group_by_list, + ParsedExpr *having, bool total_hits_count_flag) { if (total_hits_count_flag) { if (limit == nullptr) { @@ -1207,6 +1210,7 @@ QueryResult Infinity::Search(const String &db_name, select_statement->offset_expr_ = offset; select_statement->order_by_list_ = order_by_list; select_statement->group_by_list_ = group_by_list; + select_statement->having_expr_ = having; select_statement->total_hits_count_flag_ = total_hits_count_flag; QueryResult result = query_context_ptr->QueryStatement(select_statement.get()); diff --git a/src/main/infinity.cppm b/src/main/infinity.cppm index 699e7ecc3f..30d23d7677 100644 --- a/src/main/infinity.cppm +++ b/src/main/infinity.cppm @@ -182,7 +182,8 @@ public: Vector *output_columns, Vector *highlight_columns, Vector *order_by_list, - Vector *group_by_list); + Vector *group_by_list, + ParsedExpr *having); QueryResult Search(const String &db_name, const String &table_name, @@ -194,6 +195,7 @@ public: Vector *highlight_columns, Vector *order_by_list, Vector *group_by_list, + ParsedExpr *having, bool total_hits_count_flag); QueryResult Optimize(const String &db_name, const String &table_name, OptimizeOptions optimize_options = OptimizeOptions{}); diff --git a/src/network/http/http_search.cpp b/src/network/http/http_search.cpp index 6dab54298a..5ef94e2a04 100644 --- a/src/network/http/http_search.cpp +++ b/src/network/http/http_search.cpp @@ -68,6 +68,7 @@ void HTTPSearch::Process(Infinity *infinity_ptr, Vector *highlight_columns{nullptr}; Vector *order_by_list{nullptr}; Vector *group_by_columns{nullptr}; + UniquePtr having{}; bool total_hits_count_flag{}; DeferFn defer_fn([&]() { if (output_columns != nullptr) { @@ -161,6 +162,16 @@ void HTTPSearch::Process(Infinity *infinity_ptr, if (group_by_columns == nullptr) { return; } + } else if (IsEqual(key, "having")) { + if (having != nullptr) { + response["error_code"] = ErrorCode::kInvalidExpression; + response["error_message"] = "More than one having field."; + return; + } + having = ParseFilter(elem.value(), http_status, response); + if (having == nullptr) { + return; + } } else if (IsEqual(key, "filter")) { if (filter) { @@ -254,6 +265,7 @@ void HTTPSearch::Process(Infinity *infinity_ptr, highlight_columns, order_by_list, group_by_columns, + having.release(), total_hits_count_flag); output_columns = nullptr; @@ -338,6 +350,8 @@ void HTTPSearch::Explain(Infinity *infinity_ptr, Vector *output_columns{nullptr}; Vector *highlight_columns{nullptr}; Vector *order_by_list{nullptr}; + Vector *group_by_columns{nullptr}; + UniquePtr having{nullptr}; DeferFn defer_fn([&]() { if (output_columns != nullptr) { for (auto &expr : *output_columns) { @@ -418,6 +432,28 @@ void HTTPSearch::Explain(Infinity *infinity_ptr, if (order_by_list == nullptr) { return; } + } else if (IsEqual(key, "group_by")) { + if (group_by_columns != nullptr) { + response["error_code"] = ErrorCode::kInvalidExpression; + response["error_message"] = "More than one group by field."; + return; + } + auto &group_by_list = elem.value(); + + group_by_columns = ParseOutput(group_by_list, http_status, response); + if (group_by_columns == nullptr) { + return; + } + } else if (IsEqual(key, "having")) { + if (having != nullptr) { + response["error_code"] = ErrorCode::kInvalidExpression; + response["error_message"] = "More than one having field."; + return; + } + having = ParseFilter(elem.value(), http_status, response); + if (having == nullptr) { + return; + } } else if (IsEqual(key, "filter")) { if (filter != nullptr) { @@ -497,11 +533,13 @@ void HTTPSearch::Explain(Infinity *infinity_ptr, output_columns, highlight_columns, order_by_list, - nullptr); + group_by_columns, + having.release()); output_columns = nullptr; highlight_columns = nullptr; order_by_list = nullptr; + group_by_columns = nullptr; if (result.IsOk()) { SizeT block_rows = result.result_table_->DataBlockCount(); for (SizeT block_id = 0; block_id < block_rows; ++block_id) { diff --git a/src/network/infinity_thrift_service.cpp b/src/network/infinity_thrift_service.cpp index 8d424160f0..bd79054627 100644 --- a/src/network/infinity_thrift_service.cpp +++ b/src/network/infinity_thrift_service.cpp @@ -655,6 +655,8 @@ void InfinityThriftService::Select(infinity_thrift_rpc::SelectResponse &response order_by_expr = nullptr; } } + + // group by Vector *group_by_list = nullptr; DeferFn defer_fn10([&]() { if (group_by_list != nullptr) { @@ -679,6 +681,22 @@ void InfinityThriftService::Select(infinity_thrift_rpc::SelectResponse &response } } + // having + ParsedExpr *having = nullptr; + DeferFn defer_fn12([&]() { + if (having != nullptr) { + delete having; + having = nullptr; + } + }); + if (request.__isset.having_expr == true) { + having = GetParsedExprFromProto(parsed_expr_status, request.having_expr); + if (!parsed_expr_status.ok()) { + ProcessStatus(response, parsed_expr_status); + return; + } + } + // auto end2 = std::chrono::steady_clock::now(); // phase_2_duration_ += end2 - start2; // @@ -694,6 +712,7 @@ void InfinityThriftService::Select(infinity_thrift_rpc::SelectResponse &response highlight_columns, order_by_list, group_by_list, + having, request.total_hits_count); output_columns = nullptr; highlight_columns = nullptr; @@ -703,6 +722,7 @@ void InfinityThriftService::Select(infinity_thrift_rpc::SelectResponse &response offset = nullptr; order_by_list = nullptr; group_by_list = nullptr; + having = nullptr; // auto end3 = std::chrono::steady_clock::now(); // // phase_3_duration_ += end3 - start3; @@ -954,6 +974,47 @@ void InfinityThriftService::Explain(infinity_thrift_rpc::SelectResponse &respons } } + // group by + Vector *group_by_list = nullptr; + DeferFn defer_fn10([&]() { + if (group_by_list != nullptr) { + for (auto &expr_ptr : *group_by_list) { + delete expr_ptr; + expr_ptr = nullptr; + } + delete group_by_list; + group_by_list = nullptr; + } + }); + if (!request.group_by_list.empty()) { + group_by_list = new Vector(); + group_by_list->reserve(request.group_by_list.size()); + for (auto &expr : request.group_by_list) { + auto parsed_expr = GetParsedExprFromProto(parsed_expr_status, expr); + if (!parsed_expr_status.ok()) { + ProcessStatus(response, parsed_expr_status); + return; + } + group_by_list->emplace_back(parsed_expr); + } + } + + // having + ParsedExpr *having = nullptr; + DeferFn defer_fn12([&]() { + if (having != nullptr) { + delete having; + having = nullptr; + } + }); + if (request.__isset.having_expr == true) { + having = GetParsedExprFromProto(parsed_expr_status, request.having_expr); + if (!parsed_expr_status.ok()) { + ProcessStatus(response, parsed_expr_status); + return; + } + } + // Explain type auto explain_type = GetExplainTypeFromProto(request.explain_type); const QueryResult result = infinity->Explain(request.db_name, @@ -966,7 +1027,8 @@ void InfinityThriftService::Explain(infinity_thrift_rpc::SelectResponse &respons output_columns, highlight_columns, order_by_list, - nullptr); + group_by_list, + having); output_columns = nullptr; highlight_columns = nullptr; search_expr = nullptr; @@ -974,6 +1036,8 @@ void InfinityThriftService::Explain(infinity_thrift_rpc::SelectResponse &respons limit = nullptr; offset = nullptr; order_by_list = nullptr; + group_by_list = nullptr; + having = nullptr; if (result.IsOk()) { auto &columns = response.column_fields; diff --git a/src/unit_test/main/infinity.cpp b/src/unit_test/main/infinity.cpp index 47af54dac3..474c6b8309 100644 --- a/src/unit_test/main/infinity.cpp +++ b/src/unit_test/main/infinity.cpp @@ -203,7 +203,7 @@ TEST_F(InfinityTest, test1) { SearchExpr *search_expr = nullptr; - result = infinity->Search("default_db", "table1", search_expr, nullptr, nullptr, nullptr, output_columns, nullptr, nullptr, nullptr, false); + result = infinity->Search("default_db", "table1", search_expr, nullptr, nullptr, nullptr, output_columns, nullptr, nullptr, nullptr, nullptr, false); SharedPtr data_block = result.result_table_->GetDataBlockById(0); EXPECT_EQ(data_block->row_count(), 1); Value value = data_block->GetValue(0, 0); diff --git a/src/unit_test/main/table.cpp b/src/unit_test/main/table.cpp index 3009147a22..3d53bcdf7f 100644 --- a/src/unit_test/main/table.cpp +++ b/src/unit_test/main/table.cpp @@ -87,7 +87,7 @@ TEST_F(InfinityTableTest, test1) { QueryResult explain_ast = infinity - ->Explain(db_name, table_name, ExplainType::kAst, nullptr, nullptr, nullptr, nullptr, output_columns, nullptr, nullptr, nullptr); + ->Explain(db_name, table_name, ExplainType::kAst, nullptr, nullptr, nullptr, nullptr, output_columns, nullptr, nullptr, nullptr, nullptr); EXPECT_TRUE(explain_ast.IsOk()); // fmt::print("AST: {}\n", explain_ast.ToString()); } @@ -108,6 +108,7 @@ TEST_F(InfinityTableTest, test1) { output_columns, nullptr, nullptr, + nullptr, nullptr); EXPECT_TRUE(explain_unopt.IsOk()); // fmt::print("Unoptimized logical plan: {}\n", explain_unopt.ToString()); @@ -121,7 +122,7 @@ TEST_F(InfinityTableTest, test1) { QueryResult explain_optimized_logical = infinity - ->Explain(db_name, table_name, ExplainType::kOpt, nullptr, nullptr, nullptr, nullptr, output_columns, nullptr, nullptr, nullptr); + ->Explain(db_name, table_name, ExplainType::kOpt, nullptr, nullptr, nullptr, nullptr, output_columns, nullptr, nullptr, nullptr, nullptr); EXPECT_TRUE(explain_optimized_logical.IsOk()); // fmt::print("Optimized logical plan: {}\n", explain_opt.ToString()); @@ -143,6 +144,7 @@ TEST_F(InfinityTableTest, test1) { output_columns, nullptr, nullptr, + nullptr, nullptr); EXPECT_TRUE(explain_phy.IsOk()); // fmt::print("Physical plan: {}\n", explain_phy.ToString()); @@ -164,6 +166,7 @@ TEST_F(InfinityTableTest, test1) { output_columns, nullptr, nullptr, + nullptr, nullptr); EXPECT_TRUE(explain_fragment.IsOk()); // fmt::print("Fragment: {}\n", explain_fragment.ToString()); @@ -185,6 +188,7 @@ TEST_F(InfinityTableTest, test1) { output_columns, nullptr, nullptr, + nullptr, nullptr); EXPECT_TRUE(explain_pipeline.IsOk()); // fmt::print("Pipeline: {}\n", explain_pipeline.ToString()); diff --git a/test/sql/dql/aggregate/test_groupby_complex.slt b/test/sql/dql/aggregate/test_groupby_complex.slt index d6e1cb4e7f..9f612d333f 100644 --- a/test/sql/dql/aggregate/test_groupby_complex.slt +++ b/test/sql/dql/aggregate/test_groupby_complex.slt @@ -77,6 +77,23 @@ SELECT c3, SUM(c1), SUM(c2) FROM simple_groupby GROUP BY c3; 7.000000 1 1 8.000000 2 2 +query RII rowsort +SELECT c3, SUM(c1), SUM(c2) FROM simple_groupby GROUP BY c3 HAVING SUM(c1) >= 2; +---- +1.000000 2 4 +2.000000 4 3 +3.000000 2 5 +4.000000 4 4 +6.000000 2 3 +8.000000 2 2 + +query RII rowsort +SELECT c3, SUM(c1), SUM(c2) FROM simple_groupby GROUP BY c3 HAVING SUM(c1) >= 2 and c3 > 3; +---- +4.000000 4 4 +6.000000 2 3 +8.000000 2 2 + query RIII rowsort SELECT c3, COUNT(c3), SUM(c1), SUM(c2) FROM simple_groupby GROUP BY c3; ----