Skip to content

Commit

Permalink
Groupby2 (#2491)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

Fix: alias column.
Fix: order by with group by.
Fix: http column order.
Add group by to infinity_sdk, embedding_infinity, infinity http.

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
- [x] New Feature (non-breaking change which adds functionality)
- [x] Test cases
  • Loading branch information
small-turtle-1 authored Jan 24, 2025
1 parent 5c97af6 commit 28c903a
Show file tree
Hide file tree
Showing 20 changed files with 941 additions and 73 deletions.
4 changes: 4 additions & 0 deletions docs/references/http_api_reference.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -1776,6 +1776,7 @@ Searches for data in a specified table. The search can range from a simple vecto
- `"filter"`: `string`
- `"fusion"`: `object`
- `"sort"` : `object[]`
- `"group_by"`: `string[]`
- `"limit"` : `string`
- `"offset"` : `string`
- `"option"` : `object`
Expand Down Expand Up @@ -2013,6 +2014,9 @@ curl --request GET \
- `"sort"` : `object[]`
Defines how to sort the results.
- `"group_by"`: `string[]`
Indicates the expression to group by.
- `"limit"` : `string`
Indicates the limit row count.
Expand Down
30 changes: 30 additions & 0 deletions docs/references/pysdk_api_reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -1876,6 +1876,36 @@ table_obj.output(["c1", "c2"]).sort([["c2", SortType.Asc], ["c1", SortType.Desc]

---

### group_by

```python
table_object.group_by(group_by_columns)
```

Creates a group-by expression using `group_by_columns`.

#### Parameters

##### group_by_columns: `list[str] | str`, *Required*

A list of strings specifying the expression to group by. Each string in the list represents a column name or an expression.

#### Returns

- Success: An `infinity.local_infinity.table.LocalTable` object in embedded mode or an `infinity.remote_thrift.table.RemoteTable` object in client-server mode.
- Failure: `InfinityException`
- `error_code`: `int` A non-zero value indicating a specific error condition.
- `error_msg`: `str` A message providing additional details about the error.

#### Examples

```python
table_obj.output(["c1", "sum(c2)"]).group_by(["c1"]).to_df()
table_obj.output(["c1", "avg(c1)", "count(c2)", "min(c3)", "max(c4)"]).group_by(["c1", "c1+c2"]).to_df()
```

---

### limit

```python
Expand Down
12 changes: 12 additions & 0 deletions python/infinity_embedded/local_infinity/query_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,18 @@ def offset(self, offset: Optional[int]) -> InfinityLocalQueryBuilder:
self._offset = offset_expr
return self

def group_by(self, columns: List[str] | str):
group_by_list = []
if isinstance(columns, list):
for column in columns:
parsed_expr = parse_expr(maybe_parse(column))
group_by_list.append(parsed_expr)
else:
parsed_expr = parse_expr(maybe_parse(columns))
group_by_list.append(parsed_expr)
self._group_by = group_by_list
return self

def output(self, columns: Optional[list]) -> InfinityLocalQueryBuilder:
self._columns = columns
select_list: List[WrapParsedExpr] = []
Expand Down
6 changes: 6 additions & 0 deletions python/infinity_embedded/local_infinity/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,12 @@ def limit(self, limit: Optional[int]):
def offset(self, offset: Optional[int]):
self.query_builder.offset(offset)
return self

def group_by(self, group_by_expr_list: Optional[List[str]] | Optional[str]):
if group_by_expr_list is None:
return self
self.query_builder.group_by(group_by_expr_list)
return self

def sort(self, order_by_expr_list: Optional[List[list[str, SortType]]]):
for order_by_expr in order_by_expr_list:
Expand Down
5 changes: 5 additions & 0 deletions python/infinity_embedded/local_infinity/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@


def traverse_conditions(cons, fn=None):
if isinstance(cons, exp.Alias):
expr = traverse_conditions(cons.args['this'])
expr.alias_name = cons.alias
return expr

if isinstance(cons, exp.Binary):
parsed_expr = WrapParsedExpr()
function_expr = WrapFunctionExpr()
Expand Down
68 changes: 44 additions & 24 deletions python/infinity_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ def create_table(
fields.append(tmp)
except:
raise InfinityException(ErrorCode.SYNTAX_ERROR, "http adapter create table parse error")
print(fields)
# print(fields)

url = f"databases/{self.database_name}/tables/{table_name}"
h = self.net.set_up_header(["accept", "content-type"])
Expand Down Expand Up @@ -712,6 +712,7 @@ def __init__(self, output: list, table_http: table_http):
self._match_sparse = []
self._search_exprs = []
self._sort = []
self._group_by = []
self._limit = None
self._offset = None
self._option = None
Expand All @@ -730,6 +731,8 @@ def select(self):
tmp["highlight"] = self._highlight
if len(self._sort):
tmp["sort"] = self._sort
if len(self._group_by):
tmp["group_by"] = self._group_by
if self._limit is not None:
tmp["limit"] = str(self._limit)
if self._offset is not None:
Expand Down Expand Up @@ -774,6 +777,10 @@ def explain(self, ExplainType=ExplainType.Physical):
tmp["output"] = self._output
if len(self._highlight):
tmp["highlight"] = self._highlight
if len(self._sort):
tmp["sort"] = self._sort
if len(self._group_by):
tmp["group_by"] = self._group_by
if self._limit is not None:
tmp["limit"] = self._limit
if self._offset is not None:
Expand Down Expand Up @@ -827,6 +834,10 @@ def limit(self, limit_num):
def offset(self, offset):
self._offset = offset
return self

def group_by(self, group_by_list):
self._group_by = group_by_list
return self

def option(self, option: {}):
# option_str = json.dumps(option)
Expand Down Expand Up @@ -915,32 +926,37 @@ def to_result(self):
for col in col_types:
df_dict[col] = ()

line_i = 0
for res in self.output_res:
for k in res:
# print(res[k])
if k not in df_dict:
df_dict[k] = ()
tup = df_dict[k]
if isinstance(res[k], (int, float)):
new_tup = tup + (res[k],)
elif is_list(res[k]):
new_tup = tup + (ast.literal_eval(res[k]),)
elif is_date(res[k]):
new_tup = tup + (res[k],)
elif is_time(res[k]):
new_tup = tup + (res[k],)
elif is_datetime(res[k]):
new_tup = tup + (res[k],)
elif is_sparse(res[k]): # sparse vector
sparse_vec = str2sparse(res[k])
for col in res:
col_name = next(iter(col))
v = col[col_name]
if col_name not in df_dict:
df_dict[col_name] = ()
tup = df_dict[col_name]
if len(tup) == line_i + 1:
continue
if isinstance(v, (int, float)):
new_tup = tup + (v,)
elif is_list(v):
new_tup = tup + (ast.literal_eval(v),)
elif is_date(v):
new_tup = tup + (v,)
elif is_time(v):
new_tup = tup + (v,)
elif is_datetime(v):
new_tup = tup + (v,)
elif is_sparse(v): # sparse vector
sparse_vec = str2sparse(v)
new_tup = tup + (sparse_vec,)
else:
if res[k].lower() == 'true':
res[k] = True
elif res[k].lower() == 'false':
res[k] = False
new_tup = tup + (res[k],)
df_dict[k] = new_tup
if v.lower() == 'true':
v = True
elif v.lower() == 'false':
v = False
new_tup = tup + (v,)
df_dict[col_name] = new_tup
line_i += 1
# print(self.output_res)
# print(df_dict)
extra_result = None
Expand All @@ -960,6 +976,7 @@ def to_result(self):
k1 = k1.replace("+", " ")
k1 = k1.replace("-", " ")
cols = k1.split(" ")
cols = [col for col in cols if col != ""]
# print(cols)

function_name = ""
Expand All @@ -974,6 +991,9 @@ def to_result(self):
elif is_float(col.strip()):
df_type[k] = dtype('float64')
df_type[k] = function_return_type(function_name, df_type[k])
elif col == "/":
df_type[k] = dtype('float64')
break
else:
function_name = col.strip().lower()
if (function_name in functions):
Expand Down
11 changes: 11 additions & 0 deletions python/infinity_sdk/infinity/remote_thrift/query_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,17 @@ def offset(self, offset: Optional[int]) -> InfinityThriftQueryBuilder:
offset_expr = ParsedExpr(type=expr_type)
self._offset = offset_expr
return self

def group_by(self, columns: List[str] | str) -> InfinityThriftQueryBuilder:
group_by_list: List[ParsedExpr] = []
if isinstance(columns, list):
for column in columns:
column = column.lower()
group_by_list.append(parse_expr(maybe_parse(column)))
else:
group_by_list.append(parse_expr(maybe_parse(columns)))
self._groupby = group_by_list
return self

def output(self, columns: Optional[list]) -> InfinityThriftQueryBuilder:
self._columns = columns
Expand Down
8 changes: 7 additions & 1 deletion python/infinity_sdk/infinity/remote_thrift/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,12 @@ def limit(self, limit: Optional[int]):
def offset(self, offset: Optional[int]):
self.query_builder.offset(offset)
return self

def group_by(self, group_by_expr_list: Optional[List[str]] | Optional[str]):
if group_by_expr_list is None:
return self
self.query_builder.group_by(group_by_expr_list)
return self

def sort(self, order_by_expr_list: Optional[List[list[str, SortType]]]):
for order_by_expr in order_by_expr_list:
Expand Down Expand Up @@ -500,7 +506,7 @@ def _execute_query(self, query: Query) -> tuple[dict[str, list[Any]], dict[str,
highlight_list=query.highlight,
search_expr=query.search,
where_expr=query.filter,
group_by_list=None,
group_by_list=query.groupby,
limit_expr=query.limit,
offset_expr=query.offset,
order_by_list=query.sort,
Expand Down
5 changes: 5 additions & 0 deletions python/infinity_sdk/infinity/remote_thrift/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ def generic_match_to_string(generic_match_expr: ttypes.GenericMatchExpr) -> str:


def traverse_conditions(cons, fn=None) -> ttypes.ParsedExpr:
if isinstance(cons, exp.Alias):
expr = traverse_conditions(cons.args['this'])
expr.alias_name = cons.alias
return expr

if isinstance(cons, exp.Binary):
parsed_expr = ttypes.ParsedExpr()
function_expr = ttypes.FunctionExpr()
Expand Down
16 changes: 15 additions & 1 deletion python/test_pysdk/common/common_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
]

functions = [
"sqrt", "round", "ceil", "floor", "filter_text", "filter_fulltext", "or", "and", "not"
"sqrt", "round", "ceil", "floor", "filter_text", "filter_fulltext", "or", "and", "not", "char_length",
"sum", "min", "max", "count", "avg"
]

bool_functions = [
Expand All @@ -57,6 +58,19 @@ def function_return_type(function_name, param_type) :
return dtype('bool')
elif function_name == "trunc":
return dtype('str_')
elif function_name == "char_length":
return dtype('int32')
elif function_name == "sum":
if(param_type == dtype('int8') or param_type == dtype('int16') or param_type == dtype('int32') or param_type == dtype('int64')):
return dtype('int64')
else:
return dtype('float64')
elif function_name == "min" or function_name == "max":
return param_type
elif function_name == "count":
return dtype('int64')
elif function_name == "avg":
return dtype('float64')
else:
return param_type

Expand Down
Loading

0 comments on commit 28c903a

Please sign in to comment.