diff --git a/velox/common/testutil/CMakeLists.txt b/velox/common/testutil/CMakeLists.txt index f33bb98be06f..dbbe57bc843a 100644 --- a/velox/common/testutil/CMakeLists.txt +++ b/velox/common/testutil/CMakeLists.txt @@ -16,5 +16,9 @@ velox_add_library(velox_test_util ScopedTestTime.cpp TestValue.cpp) velox_link_libraries(velox_test_util PUBLIC velox_exception) if(${VELOX_BUILD_TESTING}) + velox_add_library(velox_test_output_matcher OutputMatcher.cpp) + velox_link_libraries(velox_test_output_matcher PUBLIC Folly::folly + GTest::gtest re2::re2) + add_subdirectory(tests) endif() diff --git a/velox/common/testutil/OutputMatcher.cpp b/velox/common/testutil/OutputMatcher.cpp new file mode 100644 index 000000000000..1aaba67ff593 --- /dev/null +++ b/velox/common/testutil/OutputMatcher.cpp @@ -0,0 +1,67 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/common/testutil/OutputMatcher.h" + +#include + +#include +#include +#include + +void OutputMatcher::compareOutputs( + const std::string& testName, + const std::string& result, + const std::vector& expectedRegex) { + std::string line; + std::string eline; + std::istringstream iss(result); + int lineCount = 0; + int expectedLineIndex = 0; + for (; std::getline(iss, line);) { + lineCount++; + std::vector potentialLines; + auto expectedLine = expectedRegex.at(expectedLineIndex++); + while (!RE2::FullMatch(line, expectedLine.line)) { + potentialLines.push_back(expectedLine.line); + if (!expectedLine.optional) { + ASSERT_FALSE(true) << "Output did not match " << "Source:" << testName + << ", Line number:" << lineCount + << ", Line: " << line << ", Expected Line one of: " + << folly::join(",", potentialLines); + } + expectedLine = expectedRegex.at(expectedLineIndex++); + } + } + for (int i = expectedLineIndex; i < expectedRegex.size(); i++) { + ASSERT_TRUE(expectedRegex[expectedLineIndex].optional); + } +} diff --git a/velox/common/testutil/OutputMatcher.h b/velox/common/testutil/OutputMatcher.h new file mode 100644 index 000000000000..792731c92133 --- /dev/null +++ b/velox/common/testutil/OutputMatcher.h @@ -0,0 +1,47 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include + +struct ExpectedLine { + std::string line; + bool optional = false; +}; + +class OutputMatcher { + public: + static void compareOutputs( + const std::string& testName, + const std::string& result, + const std::vector& expectedRegex); +}; diff --git a/velox/exec/OutputBuffer.h b/velox/exec/OutputBuffer.h index 640c1bfcd8e7..fa28d743bd8c 100644 --- a/velox/exec/OutputBuffer.h +++ b/velox/exec/OutputBuffer.h @@ -115,6 +115,18 @@ class DestinationBuffer { int64_t bytesSent{0}; int64_t rowsSent{0}; int64_t pagesSent{0}; + + std::string toString() const { + return fmt::format( + "[finished: {}, bytesBuffered: {}, rowsBuffered: {}, pagesBuffered: {}, bytesSent: {}, rowsSent: {}, pagesSent:{}]", + finished, + succinctBytes(bytesBuffered), + rowsBuffered, + pagesBuffered, + succinctBytes(bytesSent), + rowsSent, + pagesSent); + } }; void enqueue(std::shared_ptr data); @@ -254,7 +266,29 @@ class OutputBuffer { /// Stats of the OutputBuffer's destinations. std::vector buffersStats; - std::string toString() const; + std::string toString() const { + std::string destinationBufferStats; + if (!buffersStats.empty()) { + for (int i = 0; i < buffersStats.size(); i++) { + auto& destinationBufferStat = buffersStats[i]; + destinationBufferStats += + fmt::format(" D{}: {}\n", i, destinationBufferStat.toString()); + } + } + + return fmt::format( + "[bufferedBytes: {}, bufferedPages: {}, " + "totalBytesSent: {}, totalRowsSent: {}, totalPagesSent: {}, " + "averageBufferTimeMs: {}, numTopBuffers: {}]\n{}", + succinctBytes(bufferedBytes), + bufferedPages, + succinctBytes(totalBytesSent), + totalRowsSent, + totalPagesSent, + averageBufferTimeMs, + numTopBuffers, + destinationBufferStats); + } }; OutputBuffer( diff --git a/velox/exec/tests/CMakeLists.txt b/velox/exec/tests/CMakeLists.txt index ccd1d9caa81d..a19beeb6c7ce 100644 --- a/velox/exec/tests/CMakeLists.txt +++ b/velox/exec/tests/CMakeLists.txt @@ -137,6 +137,7 @@ target_link_libraries( velox_hive_connector velox_memory velox_serialization + velox_test_output_matcher velox_test_util velox_type velox_type_test_lib diff --git a/velox/exec/tests/OutputBufferManagerTest.cpp b/velox/exec/tests/OutputBufferManagerTest.cpp index e341ae4329fe..4ad85571a848 100644 --- a/velox/exec/tests/OutputBufferManagerTest.cpp +++ b/velox/exec/tests/OutputBufferManagerTest.cpp @@ -17,12 +17,12 @@ #include #include "folly/experimental/EventCount.h" #include "velox/common/base/tests/GTestUtils.h" +#include "velox/common/testutil/OutputMatcher.h" #include "velox/dwio/common/tests/utils/BatchMaker.h" #include "velox/exec/Task.h" #include "velox/exec/tests/utils/PlanBuilder.h" #include "velox/exec/tests/utils/SerializedPageUtil.h" #include "velox/serializers/CompactRowSerializer.h" -#include "velox/serializers/PrestoSerializer.h" #include "velox/serializers/UnsafeRowSerializer.h" using namespace facebook::velox; @@ -1324,6 +1324,44 @@ TEST_P(AllOutputBufferManagerTest, outputBufferUtilization) { verifyOutputBuffer(task, OutputBufferStatus::kFinished); } +TEST_P(AllOutputBufferManagerTest, printOutputBufferStats) { + const vector_size_t vectorSize = 100; + const std::string taskId = std::to_string(folly::Random::rand32()); + const int numDestinations = 4; + initializeTask( + taskId, + rowType_, + core::PartitionedOutputNode::Kind::kPartitioned, + numDestinations, + 1); + + const int numPages = numDestinations; + int totalNumRows = 0; + int totalBytes = 0; + for (int pageId = 0; pageId < numPages; ++pageId) { + const auto pageBytes = enqueue(taskId, pageId, rowType_, vectorSize); + fetchOneAndAck(taskId, pageId, 0); + } + + const auto statsEnqueue = getStats(taskId); + OutputMatcher::compareOutputs( + ::testing::UnitTest::GetInstance()->current_test_info()->name(), + statsEnqueue.toString(), + {{"\\[bufferedBytes: ([\\d.]+[KMGT]?[B]?), bufferedPages: (\\d+), totalBytesSent: ([\\d.]+[KMGT]?[B]?), totalRowsSent: (\\d+), totalPagesSent: (\\d+), averageBufferTimeMs: (\\d+), numTopBuffers: (\\d+)\\]"}, + {"\\s*D0: \\[finished: (true|false), bytesBuffered: ([\\d.]+[KMGT]?[B]?), rowsBuffered: (\\d+), pagesBuffered: (\\d+), bytesSent: ([\\d.]+[KMGT]?[B]?), rowsSent: (\\d+), pagesSent:(\\d+)\\]"}, + {"\\s*D1: \\[finished: (true|false), bytesBuffered: ([\\d.]+[KMGT]?[B]?), rowsBuffered: (\\d+), pagesBuffered: (\\d+), bytesSent: ([\\d.]+[KMGT]?[B]?), rowsSent: (\\d+), pagesSent:(\\d+)\\]"}, + {"\\s*D2: \\[finished: (true|false), bytesBuffered: ([\\d.]+[KMGT]?[B]?), rowsBuffered: (\\d+), pagesBuffered: (\\d+), bytesSent: ([\\d.]+[KMGT]?[B]?), rowsSent: (\\d+), pagesSent:(\\d+)\\]"}, + {"\\s*D3: \\[finished: (true|false), bytesBuffered: ([\\d.]+[KMGT]?[B]?), rowsBuffered: (\\d+), pagesBuffered: (\\d+), bytesSent: ([\\d.]+[KMGT]?[B]?), rowsSent: (\\d+), pagesSent:(\\d+)\\]"}}); + + bufferManager_->updateOutputBuffers(taskId, numDestinations, true); + noMoreData(taskId); + for (int pageId = 0; pageId < numPages; ++pageId) { + fetchEndMarker(taskId, pageId, 1); + deleteResults(taskId, pageId); + } + bufferManager_->removeTask(taskId); +} + TEST_P(AllOutputBufferManagerTest, outputBufferStats) { const vector_size_t vectorSize = 100; const std::string taskId = std::to_string(folly::Random::rand32()); @@ -1355,6 +1393,7 @@ TEST_P(AllOutputBufferManagerTest, outputBufferStats) { fetchOne(taskId, 0, pageId); } const auto statsEnqueue = getStats(taskId); + std::cout << statsEnqueue.toString(); ASSERT_EQ(statsEnqueue.buffersStats[0].pagesBuffered, 1); ASSERT_EQ(statsEnqueue.buffersStats[0].rowsBuffered, vectorSize); if (outputKind_ == core::PartitionedOutputNode::Kind::kBroadcast) { diff --git a/velox/exec/tests/PrintPlanWithStatsTest.cpp b/velox/exec/tests/PrintPlanWithStatsTest.cpp index de6dcd55bd99..9389d5a95dc9 100644 --- a/velox/exec/tests/PrintPlanWithStatsTest.cpp +++ b/velox/exec/tests/PrintPlanWithStatsTest.cpp @@ -14,15 +14,13 @@ * limitations under the License. */ +#include "velox/common/testutil/OutputMatcher.h" #include "velox/exec/PlanNodeStats.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/HiveConnectorTestBase.h" #include "velox/exec/tests/utils/PlanBuilder.h" #include "velox/exec/tests/utils/TempDirectoryPath.h" -#include -#include - using namespace facebook::velox; using namespace facebook::velox::exec::test; @@ -30,40 +28,6 @@ using facebook::velox::exec::test::PlanBuilder; class PrintPlanWithStatsTest : public HiveConnectorTestBase {}; -struct ExpectedLine { - std::string line; - bool optional = false; -}; - -void compareOutputs( - const std::string& testName, - const std::string& result, - const std::vector& expectedRegex) { - std::string line; - std::string eline; - std::istringstream iss(result); - int lineCount = 0; - int expectedLineIndex = 0; - for (; std::getline(iss, line);) { - lineCount++; - std::vector potentialLines; - auto expectedLine = expectedRegex.at(expectedLineIndex++); - while (!RE2::FullMatch(line, expectedLine.line)) { - potentialLines.push_back(expectedLine.line); - if (!expectedLine.optional) { - ASSERT_FALSE(true) << "Output did not match " << "Source:" << testName - << ", Line number:" << lineCount - << ", Line: " << line << ", Expected Line one of: " - << folly::join(",", potentialLines); - } - expectedLine = expectedRegex.at(expectedLineIndex++); - } - } - for (int i = expectedLineIndex; i < expectedRegex.size(); i++) { - ASSERT_TRUE(expectedRegex[expectedLineIndex].optional); - } -} - void ensureTaskCompletion(exec::Task* task) { // ASSERT_TRUE requires a function with return type void. ASSERT_TRUE(waitForTaskCompletion(task)); @@ -129,7 +93,7 @@ TEST_F(PrintPlanWithStatsTest, innerJoinWithTableScan) { "SELECT t.c0, t.c1 + 1, t.c1 + u.c1 FROM t, u WHERE t.c0 = u.c0"); ensureTaskCompletion(task.get()); - compareOutputs( + OutputMatcher::compareOutputs( ::testing::UnitTest::GetInstance()->current_test_info()->name(), printPlanWithStats(*op, task->taskStats()), {{"-- Project\\[4\\]\\[expressions: \\(c0:INTEGER, ROW\\[\"c0\"\\]\\), \\(p1:BIGINT, plus\\(ROW\\[\"c1\"\\],1\\)\\), \\(p2:BIGINT, plus\\(ROW\\[\"c1\"\\],ROW\\[\"u_c1\"\\]\\)\\)\\] -> c0:INTEGER, p1:BIGINT, p2:BIGINT"}, @@ -146,7 +110,7 @@ TEST_F(PrintPlanWithStatsTest, innerJoinWithTableScan) { {" Input: 0 rows \\(.+\\), Output: 100 rows \\(.+\\), Cpu time: .+, Blocked wall time: .+, Peak memory: 0B, Memory allocations: .+, Threads: 1, CPU breakdown: B/I/O/F (.+/.+/.+/.+)"}}); // with custom stats - compareOutputs( + OutputMatcher::compareOutputs( ::testing::UnitTest::GetInstance()->current_test_info()->name(), printPlanWithStats(*op, task->taskStats(), true), {{"-- Project\\[4\\]\\[expressions: \\(c0:INTEGER, ROW\\[\"c0\"\\]\\), \\(p1:BIGINT, plus\\(ROW\\[\"c1\"\\],1\\)\\), \\(p2:BIGINT, plus\\(ROW\\[\"c1\"\\],ROW\\[\"u_c1\"\\]\\)\\)\\] -> c0:INTEGER, p1:BIGINT, p2:BIGINT"}, @@ -256,7 +220,7 @@ TEST_F(PrintPlanWithStatsTest, partialAggregateWithTableScan) { .assertResults( "SELECT c5, max(c0), sum(c1), sum(c2), sum(c3), sum(c4) FROM tmp group by c5"); ensureTaskCompletion(task.get()); - compareOutputs( + OutputMatcher::compareOutputs( ::testing::UnitTest::GetInstance()->current_test_info()->name(), printPlanWithStats(*op, task->taskStats()), {{"-- Aggregation\\[1\\]\\[PARTIAL \\[c5\\] a0 := max\\(ROW\\[\"c0\"\\]\\), a1 := sum\\(ROW\\[\"c1\"\\]\\), a2 := sum\\(ROW\\[\"c2\"\\]\\), a3 := sum\\(ROW\\[\"c3\"\\]\\), a4 := sum\\(ROW\\[\"c4\"\\]\\)\\] -> c5:VARCHAR, a0:BIGINT, a1:BIGINT, a2:BIGINT, a3:DOUBLE, a4:DOUBLE"}, @@ -264,7 +228,7 @@ TEST_F(PrintPlanWithStatsTest, partialAggregateWithTableScan) { {" -- TableScan\\[0\\]\\[table: hive_table\\] -> c0:BIGINT, c1:INTEGER, c2:SMALLINT, c3:REAL, c4:DOUBLE, c5:VARCHAR"}, {" Input: 10000 rows \\(.+\\), Output: 10000 rows \\(.+\\), Cpu time: .+, Blocked wall time: .+, Peak memory: .+, Memory allocations: .+, Threads: 1, Splits: 1, CPU breakdown: B/I/O/F (.+/.+/.+/.+)"}}); - compareOutputs( + OutputMatcher::compareOutputs( ::testing::UnitTest::GetInstance()->current_test_info()->name(), printPlanWithStats(*op, task->taskStats(), true), {{"-- Aggregation\\[1\\]\\[PARTIAL \\[c5\\] a0 := max\\(ROW\\[\"c0\"\\]\\), a1 := sum\\(ROW\\[\"c1\"\\]\\), a2 := sum\\(ROW\\[\"c2\"\\]\\), a3 := sum\\(ROW\\[\"c3\"\\]\\), a4 := sum\\(ROW\\[\"c4\"\\]\\)\\] -> c5:VARCHAR, a0:BIGINT, a1:BIGINT, a2:BIGINT, a3:DOUBLE, a4:DOUBLE"}, @@ -333,7 +297,7 @@ TEST_F(PrintPlanWithStatsTest, tableWriterWithTableScan) { .splits(makeHiveConnectorSplits({filePath})) .copyResults(pool(), task); ensureTaskCompletion(task.get()); - compareOutputs( + OutputMatcher::compareOutputs( ::testing::UnitTest::GetInstance()->current_test_info()->name(), printPlanWithStats(*writePlan, task->taskStats()), {{R"(-- TableWrite\[1\]\[.+InsertTableHandle .+)"}, @@ -341,7 +305,7 @@ TEST_F(PrintPlanWithStatsTest, tableWriterWithTableScan) { {R"( -- TableScan\[0\]\[table: hive_table\] -> c0:BIGINT, c1:INTEGER, c2:SMALLINT, c3:REAL, c4:DOUBLE, c5:VARCHAR)"}, {R"( Input: 100 rows \(.+\), Output: 100 rows \(.+\), Cpu time: .+, Blocked wall time: .+, Peak memory: .+, Memory allocations: .+, Threads: 1, Splits: 1, CPU breakdown: B/I/O/F (.+/.+/.+/.+))"}}); - compareOutputs( + OutputMatcher::compareOutputs( ::testing::UnitTest::GetInstance()->current_test_info()->name(), printPlanWithStats(*writePlan, task->taskStats(), true), {{R"(-- TableWrite\[1\]\[.+InsertTableHandle .+)"},