Skip to content

Commit

Permalink
Stable files for results/tables (ydb-platform#3695)
Browse files Browse the repository at this point in the history
  • Loading branch information
vitstn authored Apr 13, 2024
1 parent b3fbe12 commit abd10c9
Show file tree
Hide file tree
Showing 59 changed files with 9,958 additions and 9,853 deletions.
30 changes: 24 additions & 6 deletions ydb/library/yql/sql/pg/pg_sql.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,8 @@ class TConverter : public IPGParseEvents {
BlockEngineEnabled = true;
} else if (flag == "BlockEngineForce") {
BlockEngineForce = true;
} if (flag == "UnorderedResult") {
UnorderedResult = true;
}
}

Expand Down Expand Up @@ -1314,7 +1316,7 @@ class TConverter : public IPGParseEvents {
return State.Statements.back();
}

auto resOptions = QL(QL(QA("type")), QL(QA("autoref")));
auto resOptions = BuildResultOptions(!sort);
State.Statements.push_back(L(A("let"), A("output"), output));
State.Statements.push_back(L(A("let"), A("result_sink"), L(A("DataSink"), QA(TString(NYql::ResultProviderName)))));
State.Statements.push_back(L(A("let"), A("world"), L(A("Write!"),
Expand All @@ -1324,6 +1326,17 @@ class TConverter : public IPGParseEvents {
return State.Statements.back();
}

TAstNode* BuildResultOptions(bool unordered) {
TVector<TAstNode*> options;
options.push_back(QL(QA("type")));
options.push_back(QL(QA("autoref")));
if (unordered && UnorderedResult) {
options.push_back(QL(QA("unordered")));
}

return QVL(options.data(), options.size());
}

[[nodiscard]]
bool ParseWithClause(const WithClause* value) {
AT_LOCATION(value);
Expand Down Expand Up @@ -2251,7 +2264,7 @@ class TConverter : public IPGParseEvents {
}
}

if (name == "useblocks" || name == "emitaggapply") {
if (name == "useblocks" || name == "emitaggapply" || name == "unorderedresult") {
if (ListLength(value->args) != 1) {
AddError(TStringBuilder() << "VariableSetStmt, expected 1 arg, but got: " << ListLength(value->args));
return nullptr;
Expand All @@ -2260,9 +2273,13 @@ class TConverter : public IPGParseEvents {
auto arg = ListNodeNth(value->args, 0);
if (NodeTag(arg) == T_A_Const && (NodeTag(CAST_NODE(A_Const, arg)->val) == T_String)) {
TString rawStr = StrVal(CAST_NODE(A_Const, arg)->val);
auto configSource = L(A("DataSource"), QA(TString(NYql::ConfigProviderName)));
State.Statements.push_back(L(A("let"), A("world"), L(A(TString(NYql::ConfigureName)), A("world"), configSource,
QA(TString(rawStr == "true" ? "" : "Disable") + TString((name == "useblocks") ? "UseBlocks" : "PgEmitAggApply")))));
if (name == "unorderedresult") {
UnorderedResult = (rawStr == "true");
} else {
auto configSource = L(A("DataSource"), QA(TString(NYql::ConfigProviderName)));
State.Statements.push_back(L(A("let"), A("world"), L(A(TString(NYql::ConfigureName)), A("world"), configSource,
QA(TString(rawStr == "true" ? "" : "Disable") + TString((name == "useblocks") ? "UseBlocks" : "PgEmitAggApply")))));
}
} else {
AddError(TStringBuilder() << "VariableSetStmt, expected string literal for " << value->name << " option");
return nullptr;
Expand Down Expand Up @@ -2546,7 +2563,7 @@ class TConverter : public IPGParseEvents {
State.Statements.push_back(L(A("let"), A("output"), output));
State.Statements.push_back(L(A("let"), A("result_sink"), L(A("DataSink"), QA(TString(NYql::ResultProviderName)))));

const auto resOptions = QL(QL(QA("type")), QL(QA("autoref")));
const auto resOptions = BuildResultOptions(true);
State.Statements.push_back(L(A("let"), A("world"), L(A("Write!"),
A("world"), A("result_sink"), L(A("Key")), A("output"), resOptions)));
State.Statements.push_back(L(A("let"), A("world"), L(A("Commit!"),
Expand Down Expand Up @@ -4851,6 +4868,7 @@ class TConverter : public IPGParseEvents {
bool DqEngineForce = false;
bool BlockEngineEnabled = false;
bool BlockEngineForce = false;
bool UnorderedResult = false;
TString TablePathPrefix;
TVector<ui32> RowStarts;
ui32 QuerySize;
Expand Down
73 changes: 71 additions & 2 deletions ydb/library/yql/tests/common/test_framework/yql_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -823,8 +823,11 @@ def normalize_table_yson(y):
return y


def dump_table_yson(res_yson):
return cyson.dumps(sorted(normalize_table_yson(cyson.loads('[' + res_yson + ']'))), format="pretty")
def dump_table_yson(res_yson, sort=True):
rows = normalize_table_yson(cyson.loads('[' + res_yson + ']'))
if sort:
rows = sorted(rows)
return cyson.dumps(rows, format="pretty")


def normalize_source_code_path(s):
Expand Down Expand Up @@ -916,6 +919,72 @@ def normalize_result(res, sort):
return res


def stable_write(writer, node):
if hasattr(node, 'attributes'):
writer.begin_attributes()
for k in sorted(node.attributes.keys()):
writer.key(k)
stable_write(writer, node.attributes[k])
writer.end_attributes()
if isinstance(node, list):
writer.begin_list()
for r in node:
stable_write(writer, r)
writer.end_list()
return
if isinstance(node, dict):
writer.begin_map()
for k in sorted(node.keys()):
writer.key(k)
stable_write(writer, node[k])
writer.end_map()
return
writer.write(node)


def stable_result_file(res):
path = res.results_file
assert os.path.exists(path)
with open(path) as f:
res = f.read()
res = cyson.loads(res)
res = replace_vals(res)
for r in res:
for data in r['Write']:
if 'Unordered' in r and 'Data' in data:
data['Data'] = sorted(data['Data'])
with open(path, 'w') as f:
writer = cyson.Writer(stream=cyson.OutputStream.from_file(f), format='pretty', mode='node')
writer.begin_stream()
stable_write(writer, res)
writer.end_stream()
with open(path) as f:
return f.read()


def stable_table_file(table):
path = table.file
assert os.path.exists(path)
assert table.attr is not None
is_sorted = False
for column in cyson.loads(table.attr)['schema']:
if 'sort_order' in column:
is_sorted = True
break
if not is_sorted:
with open(path) as f:
r = cyson.Reader(cyson.InputStream.from_file(f), mode='list_fragment')
lst = sorted(list(r.list_fragments()))
with open(path, 'w') as f:
writer = cyson.Writer(stream=cyson.OutputStream.from_file(f), format='pretty', mode='list_fragment')
writer.begin_stream()
for r in lst:
stable_write(writer, r)
writer.end_stream()
with open(path) as f:
return f.read()


class LoggingDowngrade(object):

def __init__(self, loggers, level=logging.CRITICAL):
Expand Down
37 changes: 21 additions & 16 deletions ydb/library/yql/tests/sql/dq_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@

import yatest.common
from yql_utils import get_supported_providers, yql_binary_path, is_xfail, is_skip_forceblocks, get_param, \
normalize_source_code_path, dump_table_yson, get_gateway_cfg_suffix, do_custom_query_check, normalize_result
normalize_source_code_path, dump_table_yson, get_gateway_cfg_suffix, do_custom_query_check, normalize_result, \
stable_result_file, stable_table_file

from utils import get_config, DATA_PATH
from file_common import run_file, run_file_no_cache
Expand Down Expand Up @@ -47,19 +48,17 @@ def run_test(suite, case, cfg, tmpdir, what, yql_http_file_server):
if force_blocks and re.search(r"skip force_blocks", sql_query):
pytest.skip('skip force blocks requested')

sort = not 'order' in sql_query.lower()

dq_res_yson = normalize_result(res.results, sort)

if 'ytfile can not' in sql_query or 'yt' not in get_supported_providers(config):
if force_blocks:
pytest.skip('ForceBlocks skipped - provider disabled')
if do_custom_query_check(res, sql_query):
return None
if os.path.exists(res.results_file):
stable_result_file(res)
to_canonize.append(yatest.common.canonical_file(res.results_file))
for table in tables_res:
if os.path.exists(tables_res[table].file):
stable_table_file(tables_res[table])
to_canonize.append(yatest.common.canonical_file(tables_res[table].file))
to_canonize.append(yatest.common.canonical_file(tables_res[table].yqlrun_file + ".attr"))
else:
Expand All @@ -76,23 +75,29 @@ def run_test(suite, case, cfg, tmpdir, what, yql_http_file_server):

if do_custom_query_check(yqlrun_res, sql_query):
return None
yqlrun_res_yson = normalize_result(yqlrun_res.results, sort)

if os.path.exists(yqlrun_res.results_file):
assert os.path.exists(res.results_file)
dq_res_yson = normalize_result(stable_result_file(res), False)
yqlrun_res_yson = normalize_result(stable_result_file(yqlrun_res), False)

# Compare results
assert dq_res_yson == yqlrun_res_yson, 'RESULTS_DIFFER\n' \
'%(dq_result_name)s result:\n %(dq_res_yson)s\n\n' \
'%(yqlrun_result_name)s result:\n %(yqlrun_res_yson)s\n' % locals()
# Compare results
assert dq_res_yson == yqlrun_res_yson, 'RESULTS_DIFFER\n' \
'%(dq_result_name)s result:\n %(dq_res_yson)s\n\n' \
'%(yqlrun_result_name)s result:\n %(yqlrun_res_yson)s\n' % locals()

for table in yqlrun_tables_res:
assert table in tables_res

yqlrun_table_yson = dump_table_yson(yqlrun_tables_res[table].content)
dq_table_yson = dump_table_yson(tables_res[table].content)
if os.path.exists(yqlrun_tables_res[table].file):
assert os.path.exists(tables_res[table].file)
yqlrun_table_yson = dump_table_yson(stable_table_file(yqlrun_tables_res[table]), False)
dq_table_yson = dump_table_yson(stable_table_file(tables_res[table]), False)

assert yqlrun_table_yson == dq_table_yson, \
'OUT_TABLE_DIFFER: %(table)s\n' \
'%(dq_result_name)s table:\n %(dq_table_yson)s\n\n' \
'%(yqlrun_result_name)s table:\n %(yqlrun_table_yson)s\n' % locals()
assert yqlrun_table_yson == dq_table_yson, \
'OUT_TABLE_DIFFER: %(table)s\n' \
'%(dq_result_name)s table:\n %(dq_table_yson)s\n\n' \
'%(yqlrun_result_name)s table:\n %(yqlrun_table_yson)s\n' % locals()

if force_blocks:
return None
Expand Down
Loading

0 comments on commit abd10c9

Please sign in to comment.