Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enh: aggregation functions are not required to split windows #28910

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion docs/zh/14-reference/03-taos-sql/12-distinguished.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ window_clause: {
- _wstart伪列、_wend伪列和_wduration伪列。
- 聚集函数(包括选择函数和可以由参数确定输出行数的时序特有函数)。
- 包含上面表达式的表达式。
- 且至少包含一个聚集函数。
- 窗口子句不可以和 GROUP BY 子句一起使用。
- WHERE 语句可以指定查询的起止时间和其他过滤条件。

Expand Down
3 changes: 0 additions & 3 deletions source/libs/parser/src/parTranslater.c
Original file line number Diff line number Diff line change
Expand Up @@ -4007,9 +4007,6 @@ static EDealRes searchAggFuncNode(SNode* pNode, void* pContext) {
}

static int32_t checkWindowGrpFuncCoexist(STranslateContext* pCxt, SSelectStmt* pSelect) {
if (NULL != pSelect->pWindow && !pSelect->hasAggFuncs && !pSelect->hasStateKey) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_NO_VALID_FUNC_IN_WIN);
}
if (isWindowJoinStmt(pSelect)) {
if (!pSelect->hasAggFuncs && NULL != pSelect->pHaving) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_WJOIN_HAVING_EXPR);
Expand Down
32 changes: 32 additions & 0 deletions tests/army/frame/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -787,6 +787,38 @@ def get_type(self, col):
return "INT UNSIGNED"
if self.cursor.istype(col, "BIGINT UNSIGNED"):
return "BIGINT UNSIGNED"

def check_query_col_data(self, sql1, sql2, colNum):
"""
Execute sql1 and store the colNum-th column of each row in an array.
Execute sql2 and compare the corresponding column of the result with the previously stored result.
Return True if they are the same, otherwise return False.

Args:
sql1 (str): The first SQL query to execute.
sql2 (str): The second SQL query to execute.
colNum (int): The column number to compare (0-based index).

Returns:
bool: True if the colNum-th column of the results of sql1 and sql2 are the same, otherwise False.
"""

# Execute sql1 and store the colNum-th column of each row in an array
self.cursor.execute(sql1)
result1 = self.cursor.fetchall()
col1_data = [row[colNum] for row in result1]

# Execute sql2 and compare the colNum-th column of the result with the previously stored result
self.cursor.execute(sql2)
result2 = self.cursor.fetchall()
col2_data = [row[colNum] for row in result2]

# Compare the two arrays
if col1_data == col2_data:
return
else:
tdLog.info(f"[sql1]:{sql1}, [sql2]:{sql2}, col:{colNum} {col1_data} != {col2_data}")
raise Exception

'''
def taosdStatus(self, state):
Expand Down
40 changes: 31 additions & 9 deletions tests/army/query/test_having.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,40 +219,62 @@ def test_window_having(self):
tdSql.query("SELECT _WSTART, _WEND, COUNT(*) FROM ct_win INTERVAL(15m) having count(*) > 1;")
tdSql.checkRows(5)
tdSql.checkData(0, 2, 2)
tdSql.query("SELECT _WSTART, _WEND FROM ct_win INTERVAL(15m) having count(*) > 1;")
tdSql.checkRows(5)

tdSql.error("SELECT _WSTART, _WEND, COUNT(*) FROM ct_win INTERVAL(15m) having voltage > 12;");
tdSql.error("SELECT _WSTART, _WEND, COUNT(*) FROM ct_win INTERVAL(15m) having voltage > 12;")

tdSql.query("SELECT _wstart, _wend, COUNT(*) AS cnt, FIRST(ts) AS fst, voltage FROM ct_win \
STATE_WINDOW(voltage) having count(*) > 3;");
STATE_WINDOW(voltage) having count(*) > 3;")
tdSql.checkRows(1)
tdSql.checkData(0, 2, 4)
start = tdSql.res[0][0]
end = tdSql.res[0][1]
tdSql.query("SELECT _wstart, _wend, voltage FROM ct_win \
STATE_WINDOW(voltage) having count(*) > 3;")
tdSql.checkRows(1)
tdSql.checkData(0, 0, start)
tdSql.checkData(0, 1, end)

tdSql.error("SELECT _wstart, _wend, COUNT(*) AS cnt, FIRST(ts) AS fst, voltage FROM ct_win \
STATE_WINDOW(voltage) having phase > 0.26;");
STATE_WINDOW(voltage) having phase > 0.26;")

tdSql.query("SELECT _wstart, _wend, COUNT(*), FIRST(ts) FROM ct_win SESSION(ts, 10m) having count(*) > 3;");
tdSql.query("SELECT _wstart, _wend, COUNT(*), FIRST(ts) FROM ct_win SESSION(ts, 10m) having count(*) > 3;")
tdSql.checkRows(1)
tdSql.checkData(0, 2, 5)
start = tdSql.res[0][0]
end = tdSql.res[0][1]
tdSql.query("SELECT _wstart, _wend FROM ct_win SESSION(ts, 10m) having count(*) > 3;")
tdSql.checkRows(1)
tdSql.checkData(0, 0, start)
tdSql.checkData(0, 1, end)

tdSql.error("SELECT _wstart, _wend, COUNT(*), FIRST(ts) FROM ct_win SESSION(ts, 10m) having voltage > 12;");
tdSql.error("SELECT _wstart, _wend, COUNT(*), FIRST(ts) FROM ct_win SESSION(ts, 10m) having voltage > 12;")

tdSql.query("select _wstart, _wend, count(*), first(voltage), last(voltage) from ct_win \
event_window start with voltage <= 12 end with voltage >= 17 having count(*) > 3;");
event_window start with voltage <= 12 end with voltage >= 17 having count(*) > 3;")
tdSql.checkRows(1)
tdSql.checkData(0, 2, 7)
tdSql.checkData(0, 3, 11)
tdSql.checkData(0, 4, 18)

tdSql.error("select _wstart, _wend, count(*) from ct_win \
event_window start with voltage <=12 end with voltage >= 17 having phase > 0.2;");
event_window start with voltage <=12 end with voltage >= 17 having phase > 0.2;")

tdSql.query(
"select _wstart, _wend, count(*), sum(voltage) from ct_win count_window(4) having sum(voltage) > 57;");
"select _wstart, _wend, count(*), sum(voltage) from ct_win count_window(4) having sum(voltage) > 57;")
tdSql.checkRows(1)
tdSql.checkData(0, 2, 4)
tdSql.checkData(0, 3, 61)
start = tdSql.res[0][0]
end = tdSql.res[0][1]
tdSql.query(
"select _wstart, _wend from ct_win count_window(4) having sum(voltage) > 57;")
tdSql.checkRows(1)
tdSql.checkData(0, 0, start)
tdSql.checkData(0, 1, end)

tdSql.error("select _wstart, _wend, count(*), sum(voltage) from ct_win count_window(4) having voltage > 12;");
tdSql.error("select _wstart, _wend, count(*), sum(voltage) from ct_win count_window(4) having voltage > 12;")


def prepare_stream_window_data(self):
Expand Down
27 changes: 27 additions & 0 deletions tests/army/query/window/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,36 @@ def init(self, conn, logSql, replicaVar=1):
tdLog.info(f"insert data.")
jfile = etool.curFile(__file__, "window.json")
etool.benchMark(json=jfile)

def testWithoutAggFunc(self):
sql1 = "select _wstart,_wend, tbname from db.stb partition by tbname event_window start with voltage >2 end with voltage > 15 slimit 5 limit 5"
sql2 = "select _wstart,_wend, tbname, count(voltage) from db.stb partition by tbname event_window start with voltage >2 end with voltage > 15 slimit 5 limit 5"
tdSql.check_query_col_data(sql1, sql2, 0)
tdSql.check_query_col_data(sql1, sql2, 1)
tdSql.check_query_col_data(sql1, sql2, 2)
sql1 = "select _wstart,_wend, tbname from db.stb partition by tbname count_window(600) slimit 5 limit 5"
sql2 = "select _wstart,_wend, tbname, count(voltage) from db.stb partition by tbname count_window(600) slimit 5 limit 5"
tdSql.check_query_col_data(sql1, sql2, 0)
tdSql.check_query_col_data(sql1, sql2, 1)
tdSql.check_query_col_data(sql1, sql2, 2)
sql1 = "select _wstart, _wend from db.stb event_window start with voltage >2 end with voltage > 15 limit 10;"
sql2 = "select _wstart, _wend, count(voltage) from db.stb event_window start with voltage >2 end with voltage > 15 limit 10;"
tdSql.check_query_col_data(sql1, sql2, 0)
tdSql.check_query_col_data(sql1, sql2, 1)
sql1 = "select _wstart,_wend from db.stb count_window(60000)"
sql2 = "select _wstart,_wend, count(voltage) from db.stb count_window(60000)"
tdSql.check_query_col_data(sql1, sql2, 0)
tdSql.check_query_col_data(sql1, sql2, 1)
sql1 = "select _wstart,_wend, tbname from db.stb partition by tbname count_window(60000) order by tbname, _wstart"
sql2 = "select _wstart,_wend, tbname, count(voltage) from db.stb partition by tbname count_window(60000) order by tbname, _wstart"
tdSql.check_query_col_data(sql1, sql2, 0)
tdSql.check_query_col_data(sql1, sql2, 1)
tdSql.check_query_col_data(sql1, sql2, 2)

# run
def run(self):
self.testWithoutAggFunc()

# TD-31660
sql = "select _wstart,_wend,count(voltage),tbname from db.stb partition by tbname event_window start with voltage >2 end with voltage > 15 slimit 5 limit 5"
tdSql.query(sql)
Expand Down
32 changes: 32 additions & 0 deletions tests/pytest/util/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -855,4 +855,36 @@ def redistribute_db_all_vgroups(self, db_name:str = "test", replica:int = 1):
self.redistribute_one_vgroup(db_name, replica, vnode_group_id, useful_trans_dnodes_list)
useful_trans_dnodes_list = cluset_dnodes_list.copy()

def check_query_col_data(self, sql1, sql2, colNum):
"""
Execute sql1 and store the colNum-th column of each row in an array.
Execute sql2 and compare the corresponding column of the result with the previously stored result.
Return True if they are the same, otherwise return False.

Args:
sql1 (str): The first SQL query to execute.
sql2 (str): The second SQL query to execute.
colNum (int): The column number to compare (0-based index).

Returns:
bool: True if the colNum-th column of the results of sql1 and sql2 are the same, otherwise False.
"""

# Execute sql1 and store the colNum-th column of each row in an array
self.cursor.execute(sql1)
result1 = self.cursor.fetchall()
col1_data = [row[colNum] for row in result1]

# Execute sql2 and compare the colNum-th column of the result with the previously stored result
self.cursor.execute(sql2)
result2 = self.cursor.fetchall()
col2_data = [row[colNum] for row in result2]

# Compare the two arrays
if col1_data == col2_data:
return
else:
tdLog.info(f"[sql1]:{sql1}, [sql2]:{sql2}, col:{colNum} {col1_data} != {col2_data}")
raise Exception

tdSql = TDSql()
27 changes: 26 additions & 1 deletion tests/system-test/2-query/group_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,16 +370,31 @@ def test_window(self, nonempty_tb_num):
tdSql.checkRows(nonempty_tb_num)

# state window
tdSql.query(f"select tbname, count(*), c1 from {self.dbname}.{self.stable} partition by tbname state_window(c1)")
sql1 = f"select tbname, c1, count(*) from {self.dbname}.{self.stable} partition by tbname state_window(c1) order by tbname, c1"
sql2 = f"select tbname, c1 from {self.dbname}.{self.stable} partition by tbname state_window(c1) order by tbname, c1"
tdSql.query(sql1)
tdSql.checkRows(nonempty_tb_num * self.row_nums)

tdSql.check_query_col_data(sql1, sql2, 0)
tdSql.check_query_col_data(sql1, sql2, 1)

# session window
tdSql.query(f"select count(c1) from {self.dbname}.{self.stable} partition by tbname session(ts, 5s)")
tdSql.checkRows(nonempty_tb_num)
sql1 = f"select tbname, _wstart, count(c1) from {self.dbname}.{self.stable} partition by tbname session(ts, 5s) order by tbname, _wstart"
sql2 = f"select tbname, _wstart from {self.dbname}.{self.stable} partition by tbname session(ts, 5s) order by tbname, _wstart"
tdSql.check_query_col_data(sql1, sql2, 0)
tdSql.check_query_col_data(sql1, sql2, 1)

# event window
tdSql.query(f"select tbname, count(*) from {self.dbname}.{self.stable} partition by tbname event_window start with c1 >= 0 end with c2 = 9;")
tdSql.checkRows(nonempty_tb_num)
tdSql.query(f"select tbname from {self.dbname}.{self.stable} partition by tbname event_window start with c1 >= 0 end with c2 = 9;")
tdSql.checkRows(nonempty_tb_num)
sql1 = f"select tbname, _wstart, count(*) from {self.dbname}.{self.stable} partition by tbname event_window start with c1 >= 0 end with c2 = 9 order by tbname, _wstart"
sql2 = f"select tbname, _wstart from {self.dbname}.{self.stable} partition by tbname event_window start with c1 >= 0 end with c2 = 9 order by tbname, _wstart"
tdSql.check_query_col_data(sql1, sql2, 0)
tdSql.check_query_col_data(sql1, sql2, 1)

def test_event_window(self, nonempty_tb_num):
tdSql.query(f"select tbname, count(*) from {self.dbname}.{self.stable} partition by tbname event_window start with c1 >= 0 end with c2 = 9 and 1=1;")
Expand All @@ -396,6 +411,8 @@ def test_event_window(self, nonempty_tb_num):

tdSql.query(f"select tbname, count(*) from {self.dbname}.{self.stable} partition by tbname event_window start with c1 >= 0 end with c2 = 9 and t2=0 having count(*) > 10;")
tdSql.checkRows(0)
tdSql.query(f"select tbname from {self.dbname}.{self.stable} partition by tbname event_window start with c1 >= 0 end with c2 = 9 and t2=0 having count(*) > 10;")
tdSql.checkRows(0)

tdSql.query(f"select tbname, count(*) from {self.dbname}.{self.stable} partition by tbname event_window start with c1 >= 0 end with c2 = 9 and _rowts>0;")
tdSql.checkRows(nonempty_tb_num)
Expand All @@ -414,6 +431,14 @@ def test_event_window(self, nonempty_tb_num):
tdSql.error(f"select tbname, count(*) from {self.dbname}.{self.stable} partition by tbname event_window start with c1 >= 0 end with c2 = 9 and _wstart > 1299845454;")
tdSql.error(f"select tbname, count(*) from {self.dbname}.{self.stable} partition by tbname event_window start with c1 >= 0 end with c2 = 9 and _wduration + 1s > 5s;")
tdSql.error(f"select tbname, count(*) from {self.dbname}.{self.stable} partition by tbname event_window start with c1 >= 0 end with c2 = 9 and count(*) > 10;")

tdSql.error(f"select tbname from {self.dbname}.{self.stable} partition by tbname event_window start with c1 >= 0 end with c2 = 9 and _wstart<q_start;")
tdSql.error(f"select tbname from {self.dbname}.{self.stable} partition by tbname event_window start with c1 >= 0 end with c2 = 9 and _wstart - q_start > 0;")
tdSql.error(f"select tbname from {self.dbname}.{self.stable} partition by tbname event_window start with c1 >= 0 end with c2 = 9 and _irowts>0;")
tdSql.error(f"select tbname from {self.dbname}.{self.stable} partition by tbname event_window start with c1 >= 0 and _wduration > 5s end with c2 = 9;")
tdSql.error(f"select tbname from {self.dbname}.{self.stable} partition by tbname event_window start with c1 >= 0 end with c2 = 9 and _wstart > 1299845454;")
tdSql.error(f"select tbname from {self.dbname}.{self.stable} partition by tbname event_window start with c1 >= 0 end with c2 = 9 and _wduration + 1s > 5s;")
tdSql.error(f"select tbname from {self.dbname}.{self.stable} partition by tbname event_window start with c1 >= 0 end with c2 = 9 and count(*) > 10;")

def test_error(self):
tdSql.error(f"select * from {self.dbname}.{self.stable} group by t2")
Expand Down
41 changes: 41 additions & 0 deletions tests/system-test/8-stream/stream_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,50 @@ def case1(self):
if not tdSql.getData(2, 0).startswith('new-t3_stb_'):
tdLog.exit("error6")


def caseWithoutAgg(self):

tdSql.execute(f'create database if not exists d2 vgroups 1')
tdSql.execute(f'use d2')
tdSql.execute(f'create table st2(ts timestamp, i int) tags(t int)')
tdSql.execute(f'insert into t1 using st2 tags(1) values(now, 1) (now+1s, 2)')
tdSql.execute(f'insert into t2 using st2 tags(2) values(now, 1) (now+1s, 2)')
tdSql.execute(f'insert into t3 using st2 tags(3) values(now, 1) (now+1s, 2)')

tdSql.execute("create stream stream2_1 fill_history 1 into sta_2 subtable(concat('nee.w-', tname)) AS SELECT "
"_wstart, _wend FROM st2 PARTITION BY tbname tname INTERVAL(1m)", show=True)

tdSql.execute("create stream stream2_2 fill_history 1 into stb_2 subtable(concat('new-', tname)) AS SELECT "
"_wstart, _wend FROM st2 PARTITION BY tbname tname INTERVAL(1m)", show=True)

sql= "select * from sta_2"
tdSql.check_rows_loop(3, sql, loopCount=10, waitTime=0.5)
tdSql.query("select tbname from sta_2 order by tbname")
if not tdSql.getData(0, 0).startswith('nee_w-t1_sta_'):
tdLog.exit("error1")

if not tdSql.getData(1, 0).startswith('nee_w-t2_sta_'):
tdLog.exit("error2")

if not tdSql.getData(2, 0).startswith('nee_w-t3_sta_'):
tdLog.exit("error3")

sql= "select * from stb_2"
tdSql.check_rows_loop(3, sql, loopCount=10, waitTime=0.5)
tdSql.query("select tbname from stb_2 order by tbname")
if not tdSql.getData(0, 0).startswith('new-t1_stb_'):
tdLog.exit("error4")

if not tdSql.getData(1, 0).startswith('new-t2_stb_'):
tdLog.exit("error5")

if not tdSql.getData(2, 0).startswith('new-t3_stb_'):
tdLog.exit("error6")

# run
def run(self):
self.case1()
self.caseWithoutAgg()
# gen data
random.seed(int(time.time()))
self.taosBenchmark(" -d db -t 2 -v 2 -n 1000000 -y")
Expand Down