diff --git a/velox/docs/develop/testing/join-fuzzer.rst b/velox/docs/develop/testing/join-fuzzer.rst index 1bbfbfc7df41b..3e16434e986d4 100644 --- a/velox/docs/develop/testing/join-fuzzer.rst +++ b/velox/docs/develop/testing/join-fuzzer.rst @@ -15,7 +15,7 @@ combined with randomly generated payload. When generating the join plan node, fuzzer shuffles join keys and output columns and randomly drops some columns from the output. -The fuzzer runs the query plan and compares the results with DuckDB. +The fuzzer runs the query plan and compares the results with the reference (DuckDB or Presto) as the expected result. The fuzzer then generates a set of different but logically equivalent plans, runs them and verifies that results are the same. Each plan runs twice: with @@ -65,4 +65,8 @@ Here is a full list of supported command line arguments. * ``--enable_spill``: Whether to test with spilling or not. Default is true. +* ``--arbitrator_capacity``: Arbitrator capacity in bytes. Default is 6L << 30. + +* ``--allocator_capacity``: Allocator capacity in bytes. Default is 8L << 30. + If running from CLion IDE, add ``--logtostderr=1`` to see the full output. diff --git a/velox/docs/develop/testing/row-number-fuzzer.rst b/velox/docs/develop/testing/row-number-fuzzer.rst index 6f304a50f72b8..381c4813e9855 100644 --- a/velox/docs/develop/testing/row-number-fuzzer.rst +++ b/velox/docs/develop/testing/row-number-fuzzer.rst @@ -52,4 +52,8 @@ Here is a full list of supported command line arguments. * ``--req_timeout_ms`` Timeout in milliseconds of an HTTP request to the PrestoQueryRunner. +* ``--arbitrator_capacity``: Arbitrator capacity in bytes. Default is 6L << 30. + +* ``--allocator_capacity``: Allocator capacity in bytes. Default is 8L << 30. + If running from CLion IDE, add ``--logtostderr=1`` to see the full output. diff --git a/velox/exec/fuzzer/DuckQueryRunner.cpp b/velox/exec/fuzzer/DuckQueryRunner.cpp index e19b1d33a7b08..18a791f467861 100644 --- a/velox/exec/fuzzer/DuckQueryRunner.cpp +++ b/velox/exec/fuzzer/DuckQueryRunner.cpp @@ -121,6 +121,17 @@ std::multiset> DuckQueryRunner::execute( return queryRunner.execute(sql, resultType); } +std::multiset> DuckQueryRunner::execute( + const std::string& sql, + const std::vector& probeInput, + const std::vector& buildInput, + const RowTypePtr& resultType) { + DuckDbQueryRunner queryRunner; + queryRunner.createTable("t", probeInput); + queryRunner.createTable("u", buildInput); + return queryRunner.execute(sql, resultType); +} + std::optional DuckQueryRunner::toSql( const core::PlanNodePtr& plan) { if (!isSupported(plan->outputType())) { @@ -153,6 +164,16 @@ std::optional DuckQueryRunner::toSql( return toSql(rowNumberNode); } + if (const auto joinNode = + std::dynamic_pointer_cast(plan)) { + return toSql(joinNode); + } + + if (const auto joinNode = + std::dynamic_pointer_cast(plan)) { + return toSql(joinNode); + } + VELOX_NYI(); } @@ -329,4 +350,123 @@ std::optional DuckQueryRunner::toSql( return sql.str(); } + +std::optional DuckQueryRunner::toSql( + const std::shared_ptr& joinNode) { + const auto& joinKeysToSql = [](auto keys) { + std::stringstream out; + for (auto i = 0; i < keys.size(); ++i) { + if (i > 0) { + out << ", "; + } + out << keys[i]->name(); + } + return out.str(); + }; + + const auto& equiClausesToSql = [](auto joinNode) { + std::stringstream out; + for (auto i = 0; i < joinNode->leftKeys().size(); ++i) { + if (i > 0) { + out << " AND "; + } + out << joinNode->leftKeys()[i]->name() << " = " + << joinNode->rightKeys()[i]->name(); + } + return out.str(); + }; + + const auto& outputNames = joinNode->outputType()->names(); + + std::stringstream sql; + if (joinNode->isLeftSemiProjectJoin()) { + sql << "SELECT " + << folly::join(", ", outputNames.begin(), --outputNames.end()); + } else { + sql << "SELECT " << folly::join(", ", outputNames); + } + + switch (joinNode->joinType()) { + case core::JoinType::kInner: + sql << " FROM t INNER JOIN u ON " << equiClausesToSql(joinNode); + break; + case core::JoinType::kLeft: + sql << " FROM t LEFT JOIN u ON " << equiClausesToSql(joinNode); + break; + case core::JoinType::kFull: + sql << " FROM t FULL OUTER JOIN u ON " << equiClausesToSql(joinNode); + break; + case core::JoinType::kLeftSemiFilter: + if (joinNode->leftKeys().size() > 1) { + return std::nullopt; + } + sql << " FROM t WHERE " << joinKeysToSql(joinNode->leftKeys()) + << " IN (SELECT " << joinKeysToSql(joinNode->rightKeys()) + << " FROM u)"; + break; + case core::JoinType::kLeftSemiProject: + if (joinNode->isNullAware()) { + sql << ", " << joinKeysToSql(joinNode->leftKeys()) << " IN (SELECT " + << joinKeysToSql(joinNode->rightKeys()) << " FROM u) FROM t"; + } else { + sql << ", EXISTS (SELECT * FROM u WHERE " << equiClausesToSql(joinNode) + << ") FROM t"; + } + break; + case core::JoinType::kAnti: + if (joinNode->isNullAware()) { + sql << " FROM t WHERE " << joinKeysToSql(joinNode->leftKeys()) + << " NOT IN (SELECT " << joinKeysToSql(joinNode->rightKeys()) + << " FROM u)"; + } else { + sql << " FROM t WHERE NOT EXISTS (SELECT * FROM u WHERE " + << equiClausesToSql(joinNode) << ")"; + } + break; + default: + VELOX_UNREACHABLE( + "Unknown join type: {}", static_cast(joinNode->joinType())); + } + + return sql.str(); +} + +std::optional DuckQueryRunner::toSql( + const std::shared_ptr& joinNode) { + const auto& joinKeysToSql = [](auto keys) { + std::stringstream out; + for (auto i = 0; i < keys.size(); ++i) { + if (i > 0) { + out << ", "; + } + out << keys[i]->name(); + } + return out.str(); + }; + + const auto& outputNames = joinNode->outputType()->names(); + std::stringstream sql; + + // Nested loop join without filter. + VELOX_CHECK( + joinNode->joinCondition() == nullptr, + "This code path should be called only for nested loop join without filter"); + const std::string joinCondition{"(1 = 1)"}; + switch (joinNode->joinType()) { + case core::JoinType::kInner: + sql << " FROM t INNER JOIN u ON " << joinCondition; + break; + case core::JoinType::kLeft: + sql << " FROM t LEFT JOIN u ON " << joinCondition; + break; + case core::JoinType::kFull: + sql << " FROM t FULL OUTER JOIN u ON " << joinCondition; + break; + default: + VELOX_UNREACHABLE( + "Unknown join type: {}", static_cast(joinNode->joinType())); + } + + return sql.str(); +} } // namespace facebook::velox::exec::test diff --git a/velox/exec/fuzzer/DuckQueryRunner.h b/velox/exec/fuzzer/DuckQueryRunner.h index a5dc3f785716a..bab0cc5a60d1c 100644 --- a/velox/exec/fuzzer/DuckQueryRunner.h +++ b/velox/exec/fuzzer/DuckQueryRunner.h @@ -39,6 +39,12 @@ class DuckQueryRunner : public ReferenceQueryRunner { const std::vector& input, const RowTypePtr& resultType) override; + std::multiset> execute( + const std::string& sql, + const std::vector& probeInput, + const std::vector& buildInput, + const RowTypePtr& resultType) override; + private: std::optional toSql( const std::shared_ptr& aggregationNode); @@ -52,6 +58,12 @@ class DuckQueryRunner : public ReferenceQueryRunner { std::optional toSql( const std::shared_ptr& rowNumberNode); + std::optional toSql( + const std::shared_ptr& joinNode); + + std::optional toSql( + const std::shared_ptr& joinNode); + std::unordered_set aggregateFunctionNames_; }; diff --git a/velox/exec/fuzzer/FuzzerUtil.cpp b/velox/exec/fuzzer/FuzzerUtil.cpp index a3efb12c5d9d8..c7d8ddd07e416 100644 --- a/velox/exec/fuzzer/FuzzerUtil.cpp +++ b/velox/exec/fuzzer/FuzzerUtil.cpp @@ -16,6 +16,7 @@ #include "velox/exec/fuzzer/FuzzerUtil.h" #include #include +#include "velox/common/memory/SharedArbitrator.h" #include "velox/connectors/hive/HiveConnector.h" #include "velox/connectors/hive/HiveConnectorSplit.h" #include "velox/dwio/catalog/fbhive/FileUtils.h" @@ -211,4 +212,17 @@ bool containsUnsupportedTypes(const TypePtr& type) { containsTypeKind(type, TypeKind::VARBINARY) || containsType(type, INTERVAL_DAY_TIME()); } + +void setupMemory(int64_t allocatorCapacity, int64_t arbitratorCapacity) { + FLAGS_velox_enable_memory_usage_track_in_default_memory_pool = true; + FLAGS_velox_memory_leak_check_enabled = true; + facebook::velox::memory::SharedArbitrator::registerFactory(); + facebook::velox::memory::MemoryManagerOptions options; + options.allocatorCapacity = allocatorCapacity; + options.arbitratorCapacity = arbitratorCapacity; + options.arbitratorKind = "SHARED"; + options.checkUsageLeak = true; + options.arbitrationStateCheckCb = memoryArbitrationStateCheck; + facebook::velox::memory::MemoryManager::initialize(options); +} } // namespace facebook::velox::exec::test diff --git a/velox/exec/fuzzer/FuzzerUtil.h b/velox/exec/fuzzer/FuzzerUtil.h index 5a210b6357b2b..81efca5f90236 100644 --- a/velox/exec/fuzzer/FuzzerUtil.h +++ b/velox/exec/fuzzer/FuzzerUtil.h @@ -68,4 +68,7 @@ RowTypePtr concat(const RowTypePtr& a, const RowTypePtr& b); /// /// TODO Investigate mismatches reported when comparing Varbinary. bool containsUnsupportedTypes(const TypePtr& type); + +// Invoked to set up memory system with arbitration. +void setupMemory(int64_t allocatorCapacity, int64_t arbitratorCapacity); } // namespace facebook::velox::exec::test diff --git a/velox/exec/fuzzer/JoinFuzzer.cpp b/velox/exec/fuzzer/JoinFuzzer.cpp index 4504c71357ca1..39742fd0176f5 100644 --- a/velox/exec/fuzzer/JoinFuzzer.cpp +++ b/velox/exec/fuzzer/JoinFuzzer.cpp @@ -23,6 +23,7 @@ #include "velox/dwio/dwrf/writer/Writer.h" #include "velox/exec/OperatorUtils.h" #include "velox/exec/fuzzer/FuzzerUtil.h" +#include "velox/exec/fuzzer/ReferenceQueryRunner.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/PlanBuilder.h" #include "velox/exec/tests/utils/TempDirectoryPath.h" @@ -66,7 +67,9 @@ namespace { class JoinFuzzer { public: - explicit JoinFuzzer(size_t initialSeed); + JoinFuzzer( + size_t initialSeed, + std::unique_ptr referenceQueryRunner); void go(); @@ -260,11 +263,10 @@ class JoinFuzzer { RowVectorPtr execute(const PlanWithSplits& plan, bool injectSpill); - template - std::optional computeDuckDbResult( + std::optional computeReferenceResults( + const core::PlanNodePtr& plan, const std::vector& probeInput, - const std::vector& buildInput, - const core::PlanNodePtr& plan); + const std::vector& buildInput); // Generates and executes plans using NestedLoopJoin without filters. The // result is compared to DuckDB. Returns the result vector of the cross @@ -298,10 +300,14 @@ class JoinFuzzer { exec::MemoryReclaimer::create())}; VectorFuzzer vectorFuzzer_; + std::unique_ptr referenceQueryRunner_; }; -JoinFuzzer::JoinFuzzer(size_t initialSeed) - : vectorFuzzer_{getFuzzerOptions(), pool_.get()} { +JoinFuzzer::JoinFuzzer( + size_t initialSeed, + std::unique_ptr referenceQueryRunner) + : vectorFuzzer_{getFuzzerOptions(), pool_.get()}, + referenceQueryRunner_{std::move(referenceQueryRunner)} { filesystems::registerLocalFileSystem(); // Make sure not to run out of open file descriptors. @@ -610,11 +616,10 @@ core::PlanNodePtr tryFlipJoinSides(const core::NestedLoopJoinNode& joinNode) { joinNode.outputType()); } -template -std::optional JoinFuzzer::computeDuckDbResult( +std::optional JoinFuzzer::computeReferenceResults( + const core::PlanNodePtr& plan, const std::vector& probeInput, - const std::vector& buildInput, - const core::PlanNodePtr& plan) { + const std::vector& buildInput) { if (containsUnsupportedTypes(probeInput[0]->type())) { return std::nullopt; } @@ -623,111 +628,13 @@ std::optional JoinFuzzer::computeDuckDbResult( return std::nullopt; } - DuckDbQueryRunner queryRunner; - queryRunner.createTable("t", probeInput); - queryRunner.createTable("u", buildInput); - - auto* joinNode = dynamic_cast(plan.get()); - VELOX_CHECK_NOT_NULL(joinNode); - - const auto joinKeysToSql = [](auto keys) { - std::stringstream out; - for (auto i = 0; i < keys.size(); ++i) { - if (i > 0) { - out << ", "; - } - out << keys[i]->name(); - } - return out.str(); - }; - - const auto& outputNames = plan->outputType()->names(); - std::stringstream sql; - - if constexpr (std::is_same_v) { - const auto equiClausesToSql = [](auto joinNode) { - std::stringstream out; - for (auto i = 0; i < joinNode->leftKeys().size(); ++i) { - if (i > 0) { - out << " AND "; - } - out << joinNode->leftKeys()[i]->name() << " = " - << joinNode->rightKeys()[i]->name(); - } - return out.str(); - }; - - if (joinNode->isLeftSemiProjectJoin()) { - sql << "SELECT " - << folly::join(", ", outputNames.begin(), --outputNames.end()); - } else { - sql << "SELECT " << folly::join(", ", outputNames); - } - - switch (joinNode->joinType()) { - case core::JoinType::kInner: - sql << " FROM t INNER JOIN u ON " << equiClausesToSql(joinNode); - break; - case core::JoinType::kLeft: - sql << " FROM t LEFT JOIN u ON " << equiClausesToSql(joinNode); - break; - case core::JoinType::kFull: - sql << " FROM t FULL OUTER JOIN u ON " << equiClausesToSql(joinNode); - break; - case core::JoinType::kLeftSemiFilter: - if (joinNode->leftKeys().size() > 1) { - return std::nullopt; - } - sql << " FROM t WHERE " << joinKeysToSql(joinNode->leftKeys()) - << " IN (SELECT " << joinKeysToSql(joinNode->rightKeys()) - << " FROM u)"; - break; - case core::JoinType::kLeftSemiProject: - if (joinNode->isNullAware()) { - sql << ", " << joinKeysToSql(joinNode->leftKeys()) << " IN (SELECT " - << joinKeysToSql(joinNode->rightKeys()) << " FROM u) FROM t"; - } else { - sql << ", EXISTS (SELECT * FROM u WHERE " - << equiClausesToSql(joinNode) << ") FROM t"; - } - break; - case core::JoinType::kAnti: - if (joinNode->isNullAware()) { - sql << " FROM t WHERE " << joinKeysToSql(joinNode->leftKeys()) - << " NOT IN (SELECT " << joinKeysToSql(joinNode->rightKeys()) - << " FROM u)"; - } else { - sql << " FROM t WHERE NOT EXISTS (SELECT * FROM u WHERE " - << equiClausesToSql(joinNode) << ")"; - } - break; - default: - VELOX_UNREACHABLE( - "Unknown join type: {}", static_cast(joinNode->joinType())); - } - } else { - // Nested loop join without filter. - VELOX_CHECK( - joinNode->joinCondition() == nullptr, - "This code path should be called only for nested loop join without filter"); - const std::string joinCondition{"(1 = 1)"}; - switch (joinNode->joinType()) { - case core::JoinType::kInner: - sql << " FROM t INNER JOIN u ON " << joinCondition; - break; - case core::JoinType::kLeft: - sql << " FROM t LEFT JOIN u ON " << joinCondition; - break; - case core::JoinType::kFull: - sql << " FROM t FULL OUTER JOIN u ON " << joinCondition; - break; - default: - VELOX_UNREACHABLE( - "Unknown join type: {}", static_cast(joinNode->joinType())); - } + if (auto sql = referenceQueryRunner_->toSql(plan)) { + return referenceQueryRunner_->execute( + sql.value(), probeInput, buildInput, plan->outputType()); } - return queryRunner.execute(sql.str(), plan->outputType()); + LOG(INFO) << "Query not supported by the reference DB"; + return std::nullopt; } std::vector fieldNames( @@ -1021,13 +928,14 @@ RowVectorPtr JoinFuzzer::testCrossProduct( /*withFilter*/ false); const auto expected = execute(plan, /*injectSpill=*/false); - // If OOM injection is not enabled verify the results against DuckDB. + // If OOM injection is not enabled verify the results against Reference query + // runner. if (!FLAGS_enable_oom_injection) { - if (auto duckDbResult = computeDuckDbResult( - probeInput, buildInput, plan.plan)) { + if (auto referenceResult = + computeReferenceResults(plan.plan, probeInput, buildInput)) { VELOX_CHECK( assertEqualResults( - duckDbResult.value(), plan.plan->outputType(), {expected}), + referenceResult.value(), plan.plan->outputType(), {expected}), "Velox and DuckDB results don't match"); } } @@ -1153,14 +1061,17 @@ void JoinFuzzer::verify(core::JoinType joinType) { const auto expected = execute(defaultPlan, /*injectSpill=*/false); - // If OOM injection is not enabled verify the results against DuckDB. + // If OOM injection is not enabled verify the results against Reference query + // runner. if (!FLAGS_enable_oom_injection) { - if (auto duckDbResult = computeDuckDbResult( - probeInput, buildInput, defaultPlan.plan)) { + if (auto referenceResult = + computeReferenceResults(defaultPlan.plan, probeInput, buildInput)) { VELOX_CHECK( assertEqualResults( - duckDbResult.value(), defaultPlan.plan->outputType(), {expected}), - "Velox and DuckDB results don't match"); + referenceResult.value(), + defaultPlan.plan->outputType(), + {expected}), + "Velox and Reference results don't match"); } } @@ -1548,7 +1459,9 @@ void JoinFuzzer::go() { } // namespace -void joinFuzzer(size_t seed) { - JoinFuzzer(seed).go(); +void joinFuzzer( + size_t seed, + std::unique_ptr referenceQueryRunner) { + JoinFuzzer(seed, std::move(referenceQueryRunner)).go(); } } // namespace facebook::velox::exec::test diff --git a/velox/exec/fuzzer/JoinFuzzer.h b/velox/exec/fuzzer/JoinFuzzer.h index 578522d627c71..01a5085d3876b 100644 --- a/velox/exec/fuzzer/JoinFuzzer.h +++ b/velox/exec/fuzzer/JoinFuzzer.h @@ -16,7 +16,10 @@ #pragma once #include +#include "velox/exec/fuzzer/ReferenceQueryRunner.h" namespace facebook::velox::exec::test { -void joinFuzzer(size_t seed); +void joinFuzzer( + size_t seed, + std::unique_ptr referenceQueryRunner); } diff --git a/velox/exec/fuzzer/JoinFuzzerRunner.h b/velox/exec/fuzzer/JoinFuzzerRunner.h index 3ab915c9924f0..195aab5b292a8 100644 --- a/velox/exec/fuzzer/JoinFuzzerRunner.h +++ b/velox/exec/fuzzer/JoinFuzzerRunner.h @@ -21,6 +21,7 @@ #include "velox/common/memory/SharedArbitrator.h" #include "velox/exec/MemoryReclaimer.h" #include "velox/exec/fuzzer/JoinFuzzer.h" +#include "velox/exec/fuzzer/ReferenceQueryRunner.h" #include "velox/functions/prestosql/registration/RegistrationFunctions.h" #include "velox/parse/TypeResolver.h" #include "velox/serializers/PrestoSerializer.h" @@ -57,33 +58,20 @@ /// --seed 123 \ /// --v=1 +namespace facebook::velox::exec::test { + class JoinFuzzerRunner { public: - static int run(size_t seed) { - setupMemory(); - facebook::velox::serializer::presto::PrestoVectorSerde:: - registerVectorSerde(); - facebook::velox::filesystems::registerLocalFileSystem(); - facebook::velox::functions::prestosql::registerAllScalarFunctions(); - facebook::velox::parse::registerTypeResolver(); - - facebook::velox::exec::test::joinFuzzer(seed); + static int run( + size_t seed, + std::unique_ptr referenceQueryRunner) { + serializer::presto::PrestoVectorSerde::registerVectorSerde(); + filesystems::registerLocalFileSystem(); + functions::prestosql::registerAllScalarFunctions(); + parse::registerTypeResolver(); + joinFuzzer(seed, std::move(referenceQueryRunner)); return RUN_ALL_TESTS(); } - - private: - // Invoked to set up memory system with arbitration. - static void setupMemory() { - FLAGS_velox_enable_memory_usage_track_in_default_memory_pool = true; - FLAGS_velox_memory_leak_check_enabled = true; - facebook::velox::memory::SharedArbitrator::registerFactory(); - facebook::velox::memory::MemoryManagerOptions options; - options.allocatorCapacity = 8L << 30; - options.arbitratorCapacity = 6L << 30; - options.arbitratorKind = "SHARED"; - options.checkUsageLeak = true; - options.arbitrationStateCheckCb = - facebook::velox::exec::memoryArbitrationStateCheck; - facebook::velox::memory::MemoryManager::initialize(options); - } }; + +} // namespace facebook::velox::exec::test diff --git a/velox/exec/fuzzer/PrestoQueryRunner.cpp b/velox/exec/fuzzer/PrestoQueryRunner.cpp index 26998890c1bfe..68ad29d983baf 100644 --- a/velox/exec/fuzzer/PrestoQueryRunner.cpp +++ b/velox/exec/fuzzer/PrestoQueryRunner.cpp @@ -78,6 +78,28 @@ RowVectorPtr deserialize( return result; } +RowVectorPtr makeNullRows( + const std::vector& input, + const std::string& colName, + memory::MemoryPool* pool) { + // The query doesn't need to read any columns, but it needs to see a + // specific number of rows. Make new 'input' as single all-null BIGINT + // column with as many rows as original input. This way we'll be able to + // create a temporary test table with the necessary number of rows. + vector_size_t numInput = 0; + for (const auto& v : input) { + numInput += v->size(); + } + + auto column = BaseVector::createNullConstant(BIGINT(), numInput, pool); + return std::make_shared( + pool, + ROW({colName}, {BIGINT()}), + nullptr, + numInput, + std::vector{column}); +} + class ServerResponse { public: explicit ServerResponse(const std::string& responseJson) @@ -176,6 +198,16 @@ std::optional PrestoQueryRunner::toSql( return toSql(tableWriteNode); } + if (const auto joinNode = + std::dynamic_pointer_cast(plan)) { + return toSql(joinNode); + } + + if (const auto joinNode = + std::dynamic_pointer_cast(plan)) { + return toSql(joinNode); + } + VELOX_NYI(); } @@ -579,6 +611,133 @@ std::optional PrestoQueryRunner::toSql( return sql.str(); } +std::optional PrestoQueryRunner::toSql( + const std::shared_ptr& joinNode) { + if (!isSupportedDwrfType(joinNode->sources()[0]->outputType())) { + return std::nullopt; + } + + if (!isSupportedDwrfType(joinNode->sources()[1]->outputType())) { + return std::nullopt; + } + + const auto joinKeysToSql = [](auto keys) { + std::stringstream out; + for (auto i = 0; i < keys.size(); ++i) { + if (i > 0) { + out << ", "; + } + out << keys[i]->name(); + } + return out.str(); + }; + + const auto equiClausesToSql = [](auto joinNode) { + std::stringstream out; + for (auto i = 0; i < joinNode->leftKeys().size(); ++i) { + if (i > 0) { + out << " AND "; + } + out << joinNode->leftKeys()[i]->name() << " = " + << joinNode->rightKeys()[i]->name(); + } + return out.str(); + }; + + const auto& outputNames = joinNode->outputType()->names(); + + std::stringstream sql; + if (joinNode->isLeftSemiProjectJoin()) { + sql << "SELECT " + << folly::join(", ", outputNames.begin(), --outputNames.end()); + } else { + sql << "SELECT " << folly::join(", ", outputNames); + } + + switch (joinNode->joinType()) { + case core::JoinType::kInner: + sql << " FROM t INNER JOIN u ON " << equiClausesToSql(joinNode); + break; + case core::JoinType::kLeft: + sql << " FROM t LEFT JOIN u ON " << equiClausesToSql(joinNode); + break; + case core::JoinType::kFull: + sql << " FROM t FULL OUTER JOIN u ON " << equiClausesToSql(joinNode); + break; + case core::JoinType::kLeftSemiFilter: + if (joinNode->leftKeys().size() > 1) { + return std::nullopt; + } + sql << " FROM t WHERE " << joinKeysToSql(joinNode->leftKeys()) + << " IN (SELECT " << joinKeysToSql(joinNode->rightKeys()) + << " FROM u)"; + break; + case core::JoinType::kLeftSemiProject: + if (joinNode->isNullAware()) { + sql << ", " << joinKeysToSql(joinNode->leftKeys()) << " IN (SELECT " + << joinKeysToSql(joinNode->rightKeys()) << " FROM u) FROM t"; + } else { + sql << ", EXISTS (SELECT * FROM u WHERE " << equiClausesToSql(joinNode) + << ") FROM t"; + } + break; + case core::JoinType::kAnti: + if (joinNode->isNullAware()) { + sql << " FROM t WHERE " << joinKeysToSql(joinNode->leftKeys()) + << " NOT IN (SELECT " << joinKeysToSql(joinNode->rightKeys()) + << " FROM u)"; + } else { + sql << " FROM t WHERE NOT EXISTS (SELECT * FROM u WHERE " + << equiClausesToSql(joinNode) << ")"; + } + break; + default: + VELOX_UNREACHABLE( + "Unknown join type: {}", static_cast(joinNode->joinType())); + } + + return sql.str(); +} + +std::optional PrestoQueryRunner::toSql( + const std::shared_ptr& joinNode) { + const auto& joinKeysToSql = [](auto keys) { + std::stringstream out; + for (auto i = 0; i < keys.size(); ++i) { + if (i > 0) { + out << ", "; + } + out << keys[i]->name(); + } + return out.str(); + }; + + const auto& outputNames = joinNode->outputType()->names(); + std::stringstream sql; + + // Nested loop join without filter. + VELOX_CHECK( + joinNode->joinCondition() == nullptr, + "This code path should be called only for nested loop join without filter"); + const std::string joinCondition{"(1 = 1)"}; + switch (joinNode->joinType()) { + case core::JoinType::kInner: + sql << " FROM t INNER JOIN u ON " << joinCondition; + break; + case core::JoinType::kLeft: + sql << " FROM t LEFT JOIN u ON " << joinCondition; + break; + case core::JoinType::kFull: + sql << " FROM t FULL OUTER JOIN u ON " << joinCondition; + break; + default: + VELOX_UNREACHABLE( + "Unknown join type: {}", static_cast(joinNode->joinType())); + } + + return sql.str(); +} + std::multiset> PrestoQueryRunner::execute( const std::string& sql, const std::vector& input, @@ -586,34 +745,19 @@ std::multiset> PrestoQueryRunner::execute( return exec::test::materialize(executeVector(sql, input, resultType)); } -std::vector PrestoQueryRunner::executeVector( +std::multiset> PrestoQueryRunner::execute( const std::string& sql, - const std::vector& input, - const velox::RowTypePtr& resultType) { - auto inputType = asRowType(input[0]->type()); - if (inputType->size() == 0) { - // The query doesn't need to read any columns, but it needs to see a - // specific number of rows. Make new 'input' as single all-null BIGINT - // column with as many rows as original input. This way we'll be able to - // create a 'tmp' table will the necessary number of rows. - vector_size_t numInput = 0; - for (const auto& v : input) { - numInput += v->size(); - } - - auto column = BaseVector::createNullConstant(BIGINT(), numInput, pool()); - auto rowVector = std::make_shared( - pool(), - ROW({"x"}, {BIGINT()}), - nullptr, - numInput, - std::vector{column}); - return executeVector(sql, {rowVector}, resultType); - } - - // Create tmp table in Presto using DWRF file format and add a single - // all-null row to it. + const std::vector& probeInput, + const std::vector& buildInput, + const RowTypePtr& resultType) { + return exec::test::materialize( + executeVector(sql, probeInput, buildInput, resultType)); +} +std::string PrestoQueryRunner::createTable( + const std::string& name, + const std::vector& data) { + auto inputType = asRowType(data[0]->type()); std::stringstream nullValues; for (auto i = 0; i < inputType->size(); ++i) { appendComma(i, nullValues); @@ -621,21 +765,78 @@ std::vector PrestoQueryRunner::executeVector( "cast(null as {})", toTypeSql(inputType->childAt(i))); } - execute("DROP TABLE IF EXISTS tmp"); + execute(fmt::format("DROP TABLE IF EXISTS {}", name)); execute(fmt::format( - "CREATE TABLE tmp({}) WITH (format = 'DWRF') AS SELECT {}", + "CREATE TABLE {}({}) WITH (format = 'DWRF') AS SELECT {}", + name, folly::join(", ", inputType->names()), nullValues.str())); // Query Presto to find out table's location on disk. - auto results = execute("SELECT \"$path\" FROM tmp"); + auto results = execute(fmt::format("SELECT \"$path\" FROM {}", name)); auto filePath = extractSingleValue(results); auto tableDirectoryPath = fs::path(filePath).parent_path(); // Delete the all-null row. - execute("DELETE FROM tmp"); + execute(fmt::format("DELETE FROM {}", name)); + + return tableDirectoryPath; +} + +std::vector PrestoQueryRunner::executeVector( + const std::string& sql, + const std::vector& probeInput, + const std::vector& buildInput, + const velox::RowTypePtr& resultType) { + auto probeType = asRowType(probeInput[0]->type()); + if (probeType->size() == 0) { + auto rowVector = makeNullRows(probeInput, "x", pool()); + return executeVector(sql, {rowVector}, buildInput, resultType); + } + + auto buildType = asRowType(buildInput[0]->type()); + if (probeType->size() == 0) { + auto rowVector = makeNullRows(buildInput, "y", pool()); + return executeVector(sql, probeInput, {rowVector}, resultType); + } + + // Create probe/build table in Presto using DWRF file format and add a single + // all-null row to it. + auto probeTableDirectoryPath = createTable("t", probeInput); + auto buildTableDirectoryPath = createTable("u", buildInput); + + // Create a new file in table's directory with fuzzer-generated data. + auto probeFilePath = fs::path(probeTableDirectoryPath) + .append("probe.dwrf") + .string() + .substr(strlen("file:")); + + auto buildFilePath = fs::path(buildTableDirectoryPath) + .append("build.dwrf") + .string() + .substr(strlen("file:")); + + auto writerPool = rootPool()->addAggregateChild("writer"); + writeToFile(probeFilePath, probeInput, writerPool.get()); + writeToFile(buildFilePath, buildInput, writerPool.get()); + + // Run the query. + return execute(sql); +} + +std::vector PrestoQueryRunner::executeVector( + const std::string& sql, + const std::vector& input, + const velox::RowTypePtr& resultType) { + auto inputType = asRowType(input[0]->type()); + if (inputType->size() == 0) { + auto rowVector = makeNullRows(input, "x", pool()); + return executeVector(sql, {rowVector}, resultType); + } + + auto tableDirectoryPath = createTable("tmp", input); // Create a new file in table's directory with fuzzer-generated data. auto newFilePath = fs::path(tableDirectoryPath) diff --git a/velox/exec/fuzzer/PrestoQueryRunner.h b/velox/exec/fuzzer/PrestoQueryRunner.h index 21641c70f31a4..a27dadab288bc 100644 --- a/velox/exec/fuzzer/PrestoQueryRunner.h +++ b/velox/exec/fuzzer/PrestoQueryRunner.h @@ -63,6 +63,12 @@ class PrestoQueryRunner : public velox::exec::test::ReferenceQueryRunner { const std::vector& input, const velox::RowTypePtr& resultType) override; + std::multiset> execute( + const std::string& sql, + const std::vector& probeInput, + const std::vector& buildInput, + const RowTypePtr& resultType) override; + /// Executes Presto SQL query and returns the results. Tables referenced by /// the query must already exist. std::vector execute(const std::string& sql) override; @@ -74,6 +80,12 @@ class PrestoQueryRunner : public velox::exec::test::ReferenceQueryRunner { const std::vector& input, const RowTypePtr& resultType) override; + std::vector executeVector( + const std::string& sql, + const std::vector& probeInput, + const std::vector& buildInput, + const RowTypePtr& resultType) override; + private: velox::memory::MemoryPool* rootPool() { return rootPool_.get(); @@ -99,10 +111,22 @@ class PrestoQueryRunner : public velox::exec::test::ReferenceQueryRunner { std::optional toSql( const std::shared_ptr& tableWriteNode); + std::optional toSql( + const std::shared_ptr& joinNode); + + std::optional toSql( + const std::shared_ptr& joinNode); + std::string startQuery(const std::string& sql); std::string fetchNext(const std::string& nextUri); + /// Create an empty table with given data type and table name. The function + /// returns the root directory of table files. + std::string createTable( + const std::string& name, + const std::vector& data); + const std::string coordinatorUri_; const std::string user_; const std::chrono::milliseconds timeout_; diff --git a/velox/exec/fuzzer/ReferenceQueryRunner.h b/velox/exec/fuzzer/ReferenceQueryRunner.h index 987bc8101d2ae..fb405fab487e9 100644 --- a/velox/exec/fuzzer/ReferenceQueryRunner.h +++ b/velox/exec/fuzzer/ReferenceQueryRunner.h @@ -36,6 +36,15 @@ class ReferenceQueryRunner { const std::vector& input, const RowTypePtr& resultType) = 0; + /// Executes SQL query returned by the 'toSql' method using 'probeInput' and + /// 'buildInput' data for join node. + /// Converts results using 'resultType' schema. + virtual std::multiset> execute( + const std::string& sql, + const std::vector& probeInput, + const std::vector& buildInput, + const RowTypePtr& resultType) = 0; + /// Returns true if 'executeVector' can be called to get results as Velox /// Vector. virtual bool supportsVeloxVectorResults() const { @@ -51,6 +60,15 @@ class ReferenceQueryRunner { VELOX_UNSUPPORTED(); } + /// Similar to above but for join node with 'probeInput' and 'buildInput'. + virtual std::vector executeVector( + const std::string& sql, + const std::vector& probeInput, + const std::vector& buildInput, + const RowTypePtr& resultType) { + VELOX_UNSUPPORTED(); + } + virtual std::vector execute(const std::string& sql) { VELOX_UNSUPPORTED(); } diff --git a/velox/exec/tests/JoinFuzzerTest.cpp b/velox/exec/tests/JoinFuzzerTest.cpp index 32f8beae42ee5..fdee09d878c61 100644 --- a/velox/exec/tests/JoinFuzzerTest.cpp +++ b/velox/exec/tests/JoinFuzzerTest.cpp @@ -19,7 +19,11 @@ #include #include +#include "velox/exec/fuzzer/DuckQueryRunner.h" +#include "velox/exec/fuzzer/FuzzerUtil.h" #include "velox/exec/fuzzer/JoinFuzzerRunner.h" +#include "velox/exec/fuzzer/PrestoQueryRunner.h" +#include "velox/exec/fuzzer/ReferenceQueryRunner.h" DEFINE_int64( seed, @@ -27,6 +31,44 @@ DEFINE_int64( "Initial seed for random number generator used to reproduce previous " "results (0 means start with random seed)."); +DEFINE_string( + presto_url, + "", + "Presto coordinator URI along with port. If set, we use Presto " + "source of truth. Otherwise, use DuckDB. Example: " + "--presto_url=http://127.0.0.1:8080"); + +DEFINE_uint32( + req_timeout_ms, + 1000, + "Timeout in milliseconds for HTTP requests made to reference DB, " + "such as Presto. Example: --req_timeout_ms=2000"); + +DEFINE_int64(allocator_capacity, 8L << 30, "Allocator capacity in bytes."); + +DEFINE_int64(arbitrator_capacity, 6L << 30, "Arbitrator capacity in bytes."); + +using namespace facebook::velox::exec; + +namespace { +std::unique_ptr setupReferenceQueryRunner( + const std::string& prestoUrl, + const std::string& runnerName, + const uint32_t& reqTimeoutMs) { + if (prestoUrl.empty()) { + auto duckQueryRunner = std::make_unique(); + LOG(INFO) << "Using DuckDB as the reference DB."; + return duckQueryRunner; + } + + LOG(INFO) << "Using Presto as the reference DB."; + return std::make_unique( + prestoUrl, + runnerName, + static_cast(reqTimeoutMs)); +} +} // namespace + int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); @@ -34,7 +76,10 @@ int main(int argc, char** argv) { // singletons, installing proper signal handlers for better debugging // experience, and initialize glog and gflags. folly::Init init(&argc, &argv); - - size_t initialSeed = FLAGS_seed == 0 ? std::time(nullptr) : FLAGS_seed; - return JoinFuzzerRunner::run(initialSeed); + test::setupMemory(FLAGS_allocator_capacity, FLAGS_arbitrator_capacity); + auto referenceQueryRunner = setupReferenceQueryRunner( + FLAGS_presto_url, "join_fuzzer", FLAGS_req_timeout_ms); + const size_t initialSeed = FLAGS_seed == 0 ? std::time(nullptr) : FLAGS_seed; + return test::JoinFuzzerRunner::run( + initialSeed, std::move(referenceQueryRunner)); } diff --git a/velox/exec/tests/MemoryArbitrationFuzzerTest.cpp b/velox/exec/tests/MemoryArbitrationFuzzerTest.cpp index 5a3c337e7851c..1ad0247fb8878 100644 --- a/velox/exec/tests/MemoryArbitrationFuzzerTest.cpp +++ b/velox/exec/tests/MemoryArbitrationFuzzerTest.cpp @@ -21,6 +21,7 @@ #include "velox/common/memory/SharedArbitrator.h" #include "velox/connectors/hive/HiveConnector.h" #include "velox/exec/MemoryReclaimer.h" +#include "velox/exec/fuzzer/FuzzerUtil.h" #include "velox/exec/fuzzer/MemoryArbitrationFuzzerRunner.h" #include "velox/exec/fuzzer/PrestoQueryRunner.h" #include "velox/exec/fuzzer/ReferenceQueryRunner.h" @@ -37,22 +38,6 @@ DEFINE_int64( using namespace facebook::velox::exec; -namespace { -// Invoked to set up memory system with arbitration. -void setupMemory() { - FLAGS_velox_enable_memory_usage_track_in_default_memory_pool = true; - FLAGS_velox_memory_leak_check_enabled = true; - facebook::velox::memory::SharedArbitrator::registerFactory(); - facebook::velox::memory::MemoryManagerOptions options; - options.allocatorCapacity = FLAGS_allocator_capacity; - options.arbitratorCapacity = FLAGS_arbitrator_capacity; - options.arbitratorKind = "SHARED"; - options.checkUsageLeak = true; - options.arbitrationStateCheckCb = memoryArbitrationStateCheck; - facebook::velox::memory::MemoryManager::initialize(options); -} -} // namespace - int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); @@ -60,7 +45,7 @@ int main(int argc, char** argv) { // singletons, installing proper signal handlers for better debugging // experience, and initialize glog and gflags. folly::Init init(&argc, &argv); - setupMemory(); + test::setupMemory(FLAGS_allocator_capacity, FLAGS_arbitrator_capacity); const size_t initialSeed = FLAGS_seed == 0 ? std::time(nullptr) : FLAGS_seed; return test::MemoryArbitrationFuzzerRunner::run(initialSeed); } diff --git a/velox/exec/tests/RowNumberFuzzerTest.cpp b/velox/exec/tests/RowNumberFuzzerTest.cpp index 3abdc9fd3e767..ce8317c0a9a2c 100644 --- a/velox/exec/tests/RowNumberFuzzerTest.cpp +++ b/velox/exec/tests/RowNumberFuzzerTest.cpp @@ -22,6 +22,7 @@ #include "velox/connectors/hive/HiveConnector.h" #include "velox/exec/MemoryReclaimer.h" #include "velox/exec/fuzzer/DuckQueryRunner.h" +#include "velox/exec/fuzzer/FuzzerUtil.h" #include "velox/exec/fuzzer/PrestoQueryRunner.h" #include "velox/exec/fuzzer/ReferenceQueryRunner.h" #include "velox/exec/fuzzer/RowNumberFuzzerRunner.h" @@ -45,6 +46,10 @@ DEFINE_uint32( "Timeout in milliseconds for HTTP requests made to reference DB, " "such as Presto. Example: --req_timeout_ms=2000"); +DEFINE_int64(allocator_capacity, 8L << 30, "Allocator capacity in bytes."); + +DEFINE_int64(arbitrator_capacity, 6L << 30, "Arbitrator capacity in bytes."); + using namespace facebook::velox::exec; namespace { @@ -64,20 +69,6 @@ std::unique_ptr setupReferenceQueryRunner( runnerName, static_cast(reqTimeoutMs)); } - -// Invoked to set up memory system with arbitration. -void setupMemory() { - FLAGS_velox_enable_memory_usage_track_in_default_memory_pool = true; - FLAGS_velox_memory_leak_check_enabled = true; - facebook::velox::memory::SharedArbitrator::registerFactory(); - facebook::velox::memory::MemoryManagerOptions options; - options.allocatorCapacity = 8L << 30; - options.arbitratorCapacity = 6L << 30; - options.arbitratorKind = "SHARED"; - options.checkUsageLeak = true; - options.arbitrationStateCheckCb = memoryArbitrationStateCheck; - facebook::velox::memory::MemoryManager::initialize(options); -} } // namespace int main(int argc, char** argv) { @@ -87,7 +78,7 @@ int main(int argc, char** argv) { // singletons, installing proper signal handlers for better debugging // experience, and initialize glog and gflags. folly::Init init(&argc, &argv); - setupMemory(); + test::setupMemory(FLAGS_allocator_capacity, FLAGS_arbitrator_capacity); auto referenceQueryRunner = setupReferenceQueryRunner( FLAGS_presto_url, "row_number_fuzzer", FLAGS_req_timeout_ms); const size_t initialSeed = FLAGS_seed == 0 ? std::time(nullptr) : FLAGS_seed;